-
Notifications
You must be signed in to change notification settings - Fork 1
/
handler.go
117 lines (96 loc) · 3.37 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package processor
import (
"context"
"encoding/json"
"github.com/Axway/agent-sdk/pkg/transaction"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/google/uuid"
)
// EventsHandler -
type EventsHandler struct {
ctx context.Context
logger log.FieldLogger
metrics MetricsProcessor
logEntries []TrafficLogEntry
requestID string
eventGenerator func() transaction.EventGenerator
collectorGetter func() metricCollector
}
// NewEventsHandler - return a new EventProcessor
func NewEventsHandler(ctx context.Context, logData []byte) (*EventsHandler, error) {
requestID := uuid.NewString()
p := &EventsHandler{
ctx: ctx,
logger: log.NewLoggerFromContext(ctx).WithComponent("eventsHandler").WithPackage("processor").WithField(string(ctxRequestID), requestID),
requestID: requestID,
metrics: NewMetricsProcessor(ctx),
eventGenerator: transaction.NewEventGenerator,
collectorGetter: getMetricCollector,
}
p.logger.WithField("inputData", string(logData)).Debug("data sent from kong")
err := json.Unmarshal(logData, &p.logEntries)
if err != nil {
p.logger.WithError(err).Error("could not read log data")
return nil, err
}
return p, nil
}
// Handle - processes the batch of events from the http request
func (p *EventsHandler) Handle() []beat.Event {
events := make([]beat.Event, 0)
p.logger.WithField("numEvents", len(p.logEntries)).Info("handling events in request")
p.metrics.setCollector(p.collectorGetter())
for i, entry := range p.logEntries {
ctx := context.WithValue(p.ctx, ctxEntryIndex, i)
// skip any entry that does not have request or response info
if entry.Request == nil || entry.Response == nil {
continue
}
updatedEntry := p.validateTransaction(ctx, entry)
sample, err := p.metrics.process(updatedEntry)
if err != nil {
p.logger.WithError(err).Error("handling event for metric")
continue
}
if !sample {
continue
}
// Map the log entry to log event structure expected by AMPLIFY Central Observer
events = append(events, p.handleTransaction(ctx, updatedEntry)...)
}
return events
}
func (p *EventsHandler) validateTransaction(ctx context.Context, entry TrafficLogEntry) TrafficLogEntry {
logger := log.UpdateLoggerWithContext(ctx, p.logger)
logger.Trace("checking if any entry objects are nil")
if entry.Service == nil {
// service into is nil, lets add service data so the transaction will be processed still
logger.Debug("entry service details were nil, adding ErrorService info")
entry.Service = &Service{
Name: "ErrorService",
ID: "ErrorServiceID",
}
}
if entry.Route == nil {
logger.Debug("entry route details were nil, adding ErrorRoute info")
entry.Route = &Route{
Name: "ErrorRoute",
ID: "ErrorRouteID",
}
}
if entry.Latencies == nil {
logger.Debug("entry latencies details were nil, adding empty latency info")
entry.Latencies = &Latencies{}
}
return entry
}
func (p *EventsHandler) handleTransaction(ctx context.Context, entry TrafficLogEntry) []beat.Event {
log := p.logger.WithField(string(ctxEntryIndex), ctx.Value(ctxEntryIndex))
newEvents, err := NewTransactionProcessor(ctx).setEventGenerator(p.eventGenerator()).setEntry(entry).process()
if err != nil {
log.WithError(err).Error("executing transaction processor")
return []beat.Event{}
}
return newEvents
}