/
client.go
155 lines (135 loc) · 4.42 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package logsapi
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
"github.com/honeycombio/honeycomb-lambda-extension/extension"
)
// Protocol represents the protocol that this extension should receive logs by
type Protocol string
// LogType represents the types of log messages that are supported by the logs API
type LogType string
const (
// Local hostname according to process running in extension env
localHostname = "sandbox"
// HTTPProtocol is the protocol that we receive logs over
HTTPProtocol Protocol = "HTTP"
// PlatformLog events originate from the Lambda Runtime
PlatformLog LogType = "platform"
// FunctionLog events originate from Lambda Functions
FunctionLog LogType = "function"
// extensionIdentifierHeader is used to pass a generated UUID to calls to the API
extensionIdentifierHeader = "Lambda-Extension-Identifier"
)
// Destination is where the runtime should send logs to
type Destination struct {
Protocol Protocol `json:"protocol"`
URI string `json:"URI"`
}
// BufferingOptions contains buffering configuration options for the lambda platform
type BufferingOptions struct {
TimeoutMS uint `json:"timeoutMs"`
MaxBytes uint64 `json:"maxBytes"`
MaxItems uint64 `json:"maxItems"`
}
// Client is used to communicate with the Logs API
type Client struct {
baseURL string
httpClient *http.Client
destinationPort int
bufferingOptions BufferingOptions
ExtensionID string
}
// SubscribeRequest is the request to /logs
type SubscribeRequest struct {
Dest Destination `json:"destination"`
Types []LogType `json:"types"`
Buffering BufferingOptions `json:"buffering"`
}
// SubscribeResponse is the response from /logs subscribe message
type SubscribeResponse struct {
Message string
}
// Subscribe wraps the logic of creating a client for the AWS Lambda Logs API
// and using the client to subscribe the extension to the configured log type streams.
func Subscribe(ctx context.Context, config extension.Config, extensionID string) (*SubscribeResponse, error) {
// create logs api client
logsClient := newClient(config.RuntimeAPI, config.LogsReceiverPort, BufferingOptions{
TimeoutMS: uint(config.LogsAPITimeoutMS),
MaxBytes: uint64(config.LogsAPIMaxBytes),
MaxItems: uint64(config.LogsAPIMaxItems),
})
var logTypes []LogType
disablePlatformMsg := config.LogsAPIDisablePlatformMessages
if disablePlatformMsg {
logTypes = []LogType{FunctionLog}
} else {
logTypes = []LogType{PlatformLog, FunctionLog}
}
return logsClient.subscribeToLogTypes(ctx, extensionID, logTypes)
}
// newClient returns a new Lambda Logs API client
func newClient(baseURL string, port int, bufferingOpts BufferingOptions) *Client {
if !strings.HasPrefix(baseURL, "http") {
baseURL = fmt.Sprintf("http://%s", baseURL)
}
baseURL = fmt.Sprintf("%s/2020-08-15", baseURL)
return &Client{
baseURL: baseURL,
httpClient: &http.Client{},
destinationPort: port,
bufferingOptions: bufferingOpts,
}
}
// subscribeToLogTypes will subscribe to event streams sent
// from the Logs API of the given log types.
func (c *Client) subscribeToLogTypes(ctx context.Context, extensionID string, types []LogType) (*SubscribeResponse, error) {
subscribe := SubscribeRequest{
Dest: Destination{
Protocol: HTTPProtocol,
URI: fmt.Sprintf("http://%s:%d", localHostname, c.destinationPort),
},
Types: types,
Buffering: c.bufferingOptions,
}
reqBody, err := json.Marshal(subscribe)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, "PUT", c.url("/logs"), bytes.NewBuffer(reqBody))
if err != nil {
return nil, err
}
httpReq.Header.Set(extensionIdentifierHeader, extensionID)
httpRes, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRes.StatusCode >= 400 {
return nil, fmt.Errorf("request failed with status %s", httpRes.Status)
}
defer httpRes.Body.Close()
body, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
return nil, err
}
c.ExtensionID = httpRes.Header.Get(extensionIdentifierHeader)
return &SubscribeResponse{
Message: string(body),
}, nil
}
// url is a helper function to build urls out of relative paths
func (c *Client) url(requestPath string) string {
newURL, err := url.Parse(c.baseURL)
if err != nil {
return fmt.Sprintf("%s%s", c.baseURL, requestPath)
}
newURL.Path = path.Join(newURL.Path, requestPath)
return newURL.String()
}