-
Notifications
You must be signed in to change notification settings - Fork 583
/
logs.go
153 lines (137 loc) · 4.91 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package agentapi
import (
"context"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk/agentsdk"
)
type LogsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Log slog.Logger
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
TimeNowFn func() time.Time // defaults to dbtime.Now()
}
func (a *LogsAPI) now() time.Time {
if a.TimeNowFn != nil {
return a.TimeNowFn()
}
return dbtime.Now()
}
func (a *LogsAPI) BatchCreateLogs(ctx context.Context, req *agentproto.BatchCreateLogsRequest) (*agentproto.BatchCreateLogsResponse, error) {
workspaceAgent, err := a.AgentFn(ctx)
if err != nil {
return nil, err
}
if workspaceAgent.LogsOverflowed {
return &agentproto.BatchCreateLogsResponse{LogLimitExceeded: true}, nil
}
if len(req.Logs) == 0 {
return &agentproto.BatchCreateLogsResponse{}, nil
}
logSourceID, err := uuid.FromBytes(req.LogSourceId)
if err != nil {
return nil, xerrors.Errorf("parse log source ID %q: %w", req.LogSourceId, err)
}
// This is to support the legacy API where the log source ID was
// not provided in the request body. We default to the external
// log source in this case.
if logSourceID == uuid.Nil {
// Use the external log source
externalSources, err := a.Database.InsertWorkspaceAgentLogSources(ctx, database.InsertWorkspaceAgentLogSourcesParams{
WorkspaceAgentID: workspaceAgent.ID,
CreatedAt: a.now(),
ID: []uuid.UUID{agentsdk.ExternalLogSourceID},
DisplayName: []string{"External"},
Icon: []string{"/emojis/1f310.png"},
})
if database.IsUniqueViolation(err, database.UniqueWorkspaceAgentLogSourcesPkey) {
err = nil
logSourceID = agentsdk.ExternalLogSourceID
}
if err != nil {
return nil, xerrors.Errorf("insert external workspace agent log source: %w", err)
}
if len(externalSources) == 1 {
logSourceID = externalSources[0].ID
}
}
output := make([]string, 0)
level := make([]database.LogLevel, 0)
outputLength := 0
for _, logEntry := range req.Logs {
output = append(output, logEntry.Output)
outputLength += len(logEntry.Output)
var dbLevel database.LogLevel
switch logEntry.Level {
case agentproto.Log_TRACE:
dbLevel = database.LogLevelTrace
case agentproto.Log_DEBUG:
dbLevel = database.LogLevelDebug
case agentproto.Log_INFO:
dbLevel = database.LogLevelInfo
case agentproto.Log_WARN:
dbLevel = database.LogLevelWarn
case agentproto.Log_ERROR:
dbLevel = database.LogLevelError
default:
// Default to "info" to support older clients that didn't have the
// level field.
dbLevel = database.LogLevelInfo
}
level = append(level, dbLevel)
}
logs, err := a.Database.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{
AgentID: workspaceAgent.ID,
CreatedAt: a.now(),
Output: output,
Level: level,
LogSourceID: logSourceID,
OutputLength: int32(outputLength),
})
if err != nil {
if !database.IsWorkspaceAgentLogsLimitError(err) {
return nil, xerrors.Errorf("insert workspace agent logs: %w", err)
}
err := a.Database.UpdateWorkspaceAgentLogOverflowByID(ctx, database.UpdateWorkspaceAgentLogOverflowByIDParams{
ID: workspaceAgent.ID,
LogsOverflowed: true,
})
if err != nil {
// We don't want to return here, because the agent will retry on
// failure and this isn't a huge deal. The overflow state is just a
// hint to the user that the logs are incomplete.
a.Log.Warn(ctx, "failed to update workspace agent log overflow", slog.Error(err))
}
if a.PublishWorkspaceUpdateFn != nil {
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
}
return &agentproto.BatchCreateLogsResponse{LogLimitExceeded: true}, nil
}
// Publish by the lowest log ID inserted so the log stream will fetch
// everything from that point.
if a.PublishWorkspaceAgentLogsUpdateFn != nil {
lowestLogID := logs[0].ID
a.PublishWorkspaceAgentLogsUpdateFn(ctx, workspaceAgent.ID, agentsdk.LogsNotifyMessage{
CreatedAfter: lowestLogID - 1,
})
}
if workspaceAgent.LogsLength == 0 && a.PublishWorkspaceUpdateFn != nil {
// If these are the first logs being appended, we publish a UI update
// to notify the UI that logs are now available.
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
}
return &agentproto.BatchCreateLogsResponse{}, nil
}