generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
deployment_logs.go
91 lines (77 loc) · 1.95 KB
/
deployment_logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package controller
import (
"context"
"fmt"
"time"
"github.com/alecthomas/types"
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/controller/dal"
)
var _ log.Sink = (*deploymentLogsSink)(nil)
func newDeploymentLogsSink(ctx context.Context, dal *dal.DAL) *deploymentLogsSink {
sink := &deploymentLogsSink{
logQueue: make(chan log.Entry, 10000),
dal: dal,
}
// Process logs in background
go sink.processLogs(ctx)
return sink
}
type deploymentLogsSink struct {
logQueue chan log.Entry
dal *dal.DAL
}
// Log implements Sink
func (d *deploymentLogsSink) Log(entry log.Entry) error {
select {
case d.logQueue <- entry:
default:
// Drop log entry if queue is full
return fmt.Errorf("log queue is full")
}
return nil
}
func (d *deploymentLogsSink) processLogs(ctx context.Context) {
for {
select {
case entry := <-d.logQueue:
var deployment model.DeploymentName
depStr, ok := entry.Attributes["deployment"]
if !ok {
continue
}
dep, err := model.ParseDeploymentName(depStr)
if err != nil {
continue
}
deployment = dep
var errorStr types.Option[string]
if entry.Error != nil {
errorStr = types.Some(entry.Error.Error())
}
var request types.Option[model.RequestName]
if reqStr, ok := entry.Attributes["request"]; ok {
_, req, err := model.ParseRequestName(reqStr)
if err == nil {
request = types.Some(req)
}
}
err = d.dal.InsertLogEvent(ctx, &dal.LogEvent{
RequestName: request,
DeploymentName: deployment,
Time: entry.Time,
Level: int32(entry.Level.Severity()),
Attributes: entry.Attributes,
Message: entry.Message,
Error: errorStr,
})
if err != nil {
fmt.Printf("failed to insert log entry: %v :: error: %v\n", entry, err)
}
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
}
}
}