/
trend.go
159 lines (128 loc) · 4.74 KB
/
trend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package relaxting
//GetTrendCompliancyNodes - Report #11
import (
"context"
"fmt"
"sort"
"time"
elastic "github.com/olivere/elastic/v7"
"github.com/pkg/errors"
"github.com/chef/automate/api/interservice/compliance/stats"
"github.com/chef/automate/components/compliance-service/reporting/util"
)
type TrendBucket struct {
StartTime *time.Time
EndTime *time.Time
}
// getTrendBuckets provides an array of TrendBucket structs sorted by EndTime
// First bucket has a nil StartTime to indicate that is open
func (backend ES2Backend) getTrendBucketsRoundDown(filters map[string][]string, interval int) ([]TrendBucket, error) {
interval = interval / 86400
trendBuckets := make([]TrendBucket, 0)
endTime := firstOrEmpty(filters["end_time"])
startTime := firstOrEmpty(filters["start_time"])
var startTimeAsTime, endTimeAsTime time.Time
startTimeAsTime, err = time.Parse(time.RFC3339, startTime)
if err != nil {
return nil, errors.New(fmt.Sprintf("GetTrendBuckets, cannot parse start_time: %s", err.Error()))
}
endTimeAsTime, err = time.Parse(time.RFC3339, endTime)
if err != nil {
return nil, errors.New(fmt.Sprintf("GetTrendBuckets, cannot parse end_time: %s", err.Error()))
}
bucketEndTime := endTimeAsTime
for bucketEndTime.After(startTimeAsTime) {
bucketStartTime := bucketEndTime.AddDate(0, 0, -interval)
bucketEndTimeCopy := bucketEndTime
trendBuckets = append(trendBuckets, TrendBucket{StartTime: &bucketStartTime, EndTime: &bucketEndTimeCopy})
bucketEndTime = bucketStartTime
}
// Sort buckets by end_time
sort.Slice(trendBuckets, func(i, j int) bool {
return trendBuckets[i].EndTime.Before(*trendBuckets[j].EndTime)
})
// Because first bucket to the left has no start time
if len(trendBuckets) > 0 {
trendBuckets[0].StartTime = nil
}
return trendBuckets, nil
}
//GetTrend get either a nodes or controls trend graph
func (backend ES2Backend) GetTrend(filters map[string][]string, interval int, trendType string) ([]*stats.Trend, error) {
defer util.TimeTrack(time.Now(), "GetTrend")
myName := "GetTrend"
trendStatsBuckets := make([]*stats.Trend, 0) // stores the final array that will be returned to the user
if trendType != "controls" && trendType != "nodes" {
return trendStatsBuckets,
errors.New(fmt.Sprintf("%s supports either trendType of 'controls' or 'nodes' but you passed in %s",
myName, trendType))
}
latestOnly := FetchLatestDataOrNot(filters)
depth, err := backend.NewDepth(filters, latestOnly)
if err != nil {
return trendStatsBuckets, errors.Wrap(err, fmt.Sprintf("%s unable to get depth level for report", myName))
}
queryInfo := depth.getQueryInfo()
trendBuckets := elastic.NewDateHistogramAggregation().
Interval("1d").
Field("end_time")
for aggName, agg := range depth.getTrendAggs(trendType, filters) {
trendBuckets.SubAggregation(aggName, agg)
}
searchSource := elastic.NewSearchSource().
Query(queryInfo.filtQuery).
Aggregation("trend_buckets", trendBuckets).
Size(0)
source, err := searchSource.Source()
if err != nil {
return trendStatsBuckets, err
}
client, err := backend.ES2Client()
if err != nil {
return trendStatsBuckets, errors.Wrapf(err, "%s cannot connect to elasticsearch", myName)
}
LogQueryPartMin(queryInfo.esIndex, source, fmt.Sprintf("%s query searchSource", myName))
searchResult, err := client.Search().
SearchSource(searchSource).
Index(queryInfo.esIndex).
Do(context.Background())
if err != nil {
return trendStatsBuckets, errors.Wrapf(err, "%s cannot get result from elasticsearch", myName)
}
mapOfTrends, err := depth.getTrendResults(trendType, searchResult)
if err != nil {
return trendStatsBuckets, errors.Wrapf(err, "%s cannot get trendResults", myName)
}
// only used to store the time intervals
trendBucketsComputed, err := backend.getTrendBucketsRoundDown(filters, 86400)
if err != nil {
return nil, errors.New(fmt.Sprintf("%s returned error: %s", myName, err.Error()))
}
if len(trendBucketsComputed) == 0 {
return trendStatsBuckets, nil
}
for _, trendBucket := range trendBucketsComputed {
indexDate := trendBucket.EndTime
indexDateAsString := indexDate.Format(time.RFC3339)
zaStatsBucket := mapOfTrends[indexDateAsString]
if zaStatsBucket == nil {
zaStatsBucket = &stats.Trend{
ReportTime: indexDateAsString,
Passed: 0,
Failed: 0,
Skipped: 0,
Waived: 0,
}
}
trendStatsBuckets = append(trendStatsBuckets, zaStatsBucket)
}
LogQueryPartMin(queryInfo.esIndex, searchResult, fmt.Sprintf("%s query searchResult", myName))
return trendStatsBuckets, nil
}
func stringArrayToInterfaceArray(stringArrayToConvert []string) []interface{} {
interfaceArray := make([]interface{}, len(stringArrayToConvert))
for i, v := range stringArrayToConvert {
interfaceArray[i] = v
}
return interfaceArray
}