/
ingest_call.go
187 lines (167 loc) · 6.6 KB
/
ingest_call.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package ingesttest
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/golang/protobuf/jsonpb"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"path/filepath"
"github.com/chef/automate/api/interservice/compliance/ingest/events/compliance"
"github.com/chef/automate/api/interservice/compliance/ingest/ingest"
"github.com/chef/automate/components/compliance-service/examples/helpers"
)
const maxSize = 1 << 20
func SendReportToGRPC(file string, threadCount int, reportsPerThread int) error {
fileData, err := os.Open(file)
if err != nil {
logrus.Errorf("ingest_call.go - Unable to read file: %s", file)
return err
}
defer fileData.Close()
connFactory := helpers.SecureConnFactory()
reportEndpoint := fmt.Sprintf("%s:%d", "127.0.0.1", 10121)
logrus.Infof("Handing over report to ingest service %s", reportEndpoint)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancel()
conn, err := connFactory.DialContext(ctx, "compliance-service", reportEndpoint, grpc.WithBlock())
if err != nil {
logrus.Errorf("ingest_call.go - Unable grpc dial: %s", err)
return err
}
logrus.Infof("Connection to GRPC successful, sending report(s) now...")
defer conn.Close()
var iReport compliance.Report
unmarshaler := &jsonpb.Unmarshaler{AllowUnknownFields: true}
if err = unmarshaler.Unmarshal(fileData, &iReport); err != nil {
logrus.Errorf("ingest_call.go - reportToGRPC was unable to unmarshal the report output into a compliance.Report struct: %s", err.Error())
return err
}
json_file := filepath.Base(file)
// Using the report filename convention to change the end_time of the report based on the current date and time
// This is needed to test the last 24 hours functionality of Automate when an end_time is not specified for API calls
if strings.HasPrefix(json_file, "NOW") {
split_file := strings.Split(json_file, "_")
if len(split_file) < 6 {
logrus.Fatalf("Dynamic report file (%s) is invalid. starting with NOW must have at least 5 underscores. Example: NOW_MINUS_1440_PLUS_0060_osx(2)-omega-pro1(p)-passed.json", json_file)
}
new_end_date := time.Now()
if split_file[1] == "MINUS" {
first_minutes, err := strconv.Atoi(split_file[2])
if err != nil {
logrus.Fatalf("Can't convert '%s' to integer, aborting report %s", split_file[2], json_file)
}
second_minutes, err := strconv.Atoi(split_file[4])
if err != nil {
logrus.Fatalf("Can't convert '%s' to integer, aborting report %s", split_file[4], json_file)
}
new_end_date = new_end_date.Add(-time.Duration(first_minutes) * time.Minute)
if split_file[3] == "MINUS" {
new_end_date = new_end_date.Add(-time.Duration(second_minutes) * time.Minute)
} else if split_file[3] == "PLUS" {
if first_minutes == 1440 && second_minutes == 1 {
// Need less than a minute, otherwise some tests run before this minute expires, creating flaky tests
new_end_date = new_end_date.Add(15 * time.Second)
} else {
new_end_date = new_end_date.Add(time.Duration(second_minutes) * time.Minute)
}
} else {
logrus.Fatalf("Don't understand operand %s when processing dynamic report %s. Only MINUS and PLUS are supported.", split_file[3], json_file)
}
iReport.EndTime = new_end_date.UTC().Format(time.RFC3339)
logrus.Infof("[%s] Ingesting dynamic report (%s) with updated end_time (%s)", time.Now().UTC().Format(time.RFC3339), iReport.ReportUuid, iReport.EndTime)
} else {
logrus.Fatalf("'%s' not supported after NOW, only 'MINUS', aborting file %s", split_file[1], json_file)
}
}
client := ingest.NewComplianceIngesterServiceClient(conn)
if client == nil {
logrus.Errorf("ingest_call.go - reportToGRPC got nil for NewComplianceIngesterClient: %s", err)
return err
}
logrus.Info("-------------------------------------------------------")
deadline, _ := ctx.Deadline()
onesie := true
if threadCount > 1 || reportsPerThread > 1 {
onesie = false
}
logrus.Infof("Ingest context deadline=%s", deadline.UTC().Format(time.RFC3339))
threadsProgress := make([]int, threadCount)
for i := 1; i <= threadCount; i++ {
go SendThreadedReports(iReport, onesie, i, reportsPerThread, threadsProgress, client, ctx)
}
// Waiting for all threads to send the reports
for completed := false; completed == false; {
isCompleted := true
for i := 0; i < threadCount; i++ {
if threadsProgress[i] < reportsPerThread {
isCompleted = false
break
}
}
completed = isCompleted
time.Sleep(time.Second)
}
return nil
}
// SendThreadedReports is sequentially sending `reportsPerThread` reports on behalf of thread `threadNr`
func SendThreadedReports(iReport compliance.Report, onesie bool, threadNr int, reportsPerThread int, threadsProgress []int, client ingest.ComplianceIngesterServiceClient, ctx context.Context) {
for j := 1; j <= reportsPerThread; j++ {
SendUniqueReport(iReport, onesie, threadNr, j, client, ctx)
threadsProgress[threadNr-1] = j
}
}
// SendUniqueReport is sending one report with ReportUuid and EndTime modified
// It measures the execution time and reports success of failure
func SendUniqueReport(iReport compliance.Report, onesie bool, threadNr int, reportNr int, client ingest.ComplianceIngesterServiceClient, ctx context.Context) {
// If the report is ingested more than once, we change the report id and time
// We also change the node id to avoid ES ingest 409 conflicts
if !onesie {
iReport.ReportUuid = uuid.Must(uuid.NewV4()).String()
iReport.NodeUuid = uuid.Must(uuid.NewV4()).String()
iReport.EndTime = time.Now().UTC().Format(time.RFC3339)
}
start := time.Now()
marshaller, err := json.Marshal(iReport)
if err != nil {
logrus.Errorf("Report Marshal error, %v", err)
return
}
reader := bytes.NewReader(marshaller)
buffer := make([]byte, maxSize)
stream, err := client.ProcessComplianceReport(ctx)
if err != nil {
logrus.Errorf("Report processing error, %v", err)
return
}
for {
n, err := reader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
logrus.Error("cannot read chunk to buffer: ", err)
return
}
request := &ingest.ReportData{Content: buffer[:n]}
err = stream.Send(request)
if err != nil {
logrus.Errorf("Report processing error, %v", err)
return
}
}
_, err = stream.CloseAndRecv()
elapsed := time.Since(start)
if err != nil && err != io.EOF {
logrus.Errorf("ingest_call.go - Failed to send report %s, took %s, error: %s", iReport.ReportUuid, elapsed.Truncate(time.Millisecond), err)
return
}
logrus.Infof("Thread %d, successfully ingested report %d (%s) in %s", threadNr, reportNr, iReport.ReportUuid, elapsed.Truncate(time.Millisecond))
}