forked from raystack/meteor
/
stats.go
128 lines (103 loc) · 2.7 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package auditlog
import (
"fmt"
"github.com/goto/meteor/plugins/extractors/bigquery/sqlparser"
"github.com/pkg/errors"
)
type TableStats struct {
TableUsage map[string]int64
JoinDetail map[string]map[string]JoinDetail
FilterConditions map[string]map[string]bool
processedLog *LogData
}
type JoinDetail struct {
Usage int64
Conditions map[string]bool
}
func NewTableStats() *TableStats {
ts := &TableStats{}
ts.initPopulate()
return ts
}
func (b *TableStats) initPopulate() {
b.TableUsage = map[string]int64{}
b.JoinDetail = map[string]map[string]JoinDetail{}
b.FilterConditions = map[string]map[string]bool{}
}
func (b *TableStats) Populate(ld *LogData) error {
b.processedLog = ld
// if 0, query is not involving table
refTablesURN := b.processedLog.GetReferencedTablesURN()
if len(refTablesURN) == 0 {
return errors.New("empty referenced tables")
}
// query must be there, otherwise it is not valid
sqlQuery, err := b.processedLog.GetQuery()
if err != nil {
// log query not exist here
return fmt.Errorf("get query: %w", err)
}
jcs := sqlparser.ParseJoinConditions(sqlQuery)
fcs := sqlparser.ParseFilterConditions(sqlQuery)
// populate all data
for _, rt := range refTablesURN {
// single table usage
b.populateTableUsage(rt)
// no common join if only 1 referenced tables
if len(refTablesURN) > 1 {
b.populateJoinDetail(rt, refTablesURN, jcs)
}
b.populateFilterConditions(rt, fcs)
}
return nil
}
func (b *TableStats) populateTableUsage(tableURN string) {
// single table usage
if _, exist := b.TableUsage[tableURN]; !exist {
b.TableUsage[tableURN] = 0
}
b.TableUsage[tableURN]++
}
func (b *TableStats) populateJoinDetail(tableURN string, refTablesURN, jcs []string) {
if _, exist := b.JoinDetail[tableURN]; !exist {
b.JoinDetail[tableURN] = map[string]JoinDetail{}
}
// join detail
for _, selectedTableURN := range refTablesURN {
if selectedTableURN == tableURN {
continue
}
// init usage and conditions
jd, exist := b.JoinDetail[tableURN][selectedTableURN]
if !exist {
jd.Usage = 1
} else {
// update usage
jd.Usage++
}
b.JoinDetail[tableURN][selectedTableURN] = jd
// ignore join conditions
if len(jcs) == 0 {
continue
}
// init conditions
if jd.Conditions == nil {
jd.Conditions = map[string]bool{}
}
for _, jc := range jcs {
jd.Conditions[jc] = true
b.JoinDetail[tableURN][selectedTableURN] = jd
}
}
}
func (b *TableStats) populateFilterConditions(tableURN string, fcs []string) {
if len(fcs) == 0 {
return
}
if _, exist := b.FilterConditions[tableURN]; !exist {
b.FilterConditions[tableURN] = map[string]bool{}
}
for _, fc := range fcs {
b.FilterConditions[tableURN][fc] = true
}
}