Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Releasing new layer versions
`sh zip.sh`


- The new wheel package gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases).
- The new extension binary and zip files gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases).

Run below commands to create and push tags

Expand Down
23 changes: 18 additions & 5 deletions lambda-extensions/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type LambdaExtensionConfig struct {
LambdaRegion string
SourceCategoryOverride string
EnhanceJsonLogs bool
EnableSpanDrops bool
}

var defaultLogTypes = []string{"platform", "function"}
Expand Down Expand Up @@ -66,6 +67,7 @@ func GetConfig() (*LambdaExtensionConfig, error) {
}
return config, nil
}

func (cfg *LambdaExtensionConfig) setDefaults() {
numRetry := os.Getenv("SUMO_NUM_RETRIES")
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
Expand All @@ -75,6 +77,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
logTypes := os.Getenv("SUMO_LOG_TYPES")
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")

if numRetry == "" {
cfg.NumRetry = 3
Expand All @@ -88,7 +91,6 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
if maxConcurrentRequests == "" {
cfg.MaxConcurrentRequests = 3
}

if enableFailover == "" {
cfg.EnableFailover = false
}
Expand All @@ -106,6 +108,10 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
if enhanceJsonLogs == "" {
cfg.EnhanceJsonLogs = true
}
if enableSpanDrops == "" {
// by default, spans will not be dropped if user did not configure the env variable
cfg.EnableSpanDrops = false
}
}

func (cfg *LambdaExtensionConfig) validateConfig() error {
Expand All @@ -116,6 +122,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")

var allErrors []string
var err error
Expand All @@ -139,7 +146,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
}
}

if cfg.EnableFailover == true {
if cfg.EnableFailover {
if cfg.S3BucketName == "" {
allErrors = append(allErrors, "SUMO_S3_BUCKET_NAME not set in environment variable")
}
Expand Down Expand Up @@ -173,25 +180,24 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
} else {
cfg.MaxDataQueueLength = int(customMaxDataQueueLength)
}

}

if maxConcurrentRequests != "" {
customMaxConcurrentRequests, err := strconv.ParseInt(maxConcurrentRequests, 10, 32)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_MAX_CONCURRENT_REQUESTS: %v", err))
} else {
cfg.MaxConcurrentRequests = int(customMaxConcurrentRequests)
}

}

if logLevel != "" {
customloglevel, err := logrus.ParseLevel(logLevel)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_LOG_LEVEL: %v", err))
} else {
cfg.LogLevel = customloglevel
}

}

if enhanceJsonLogs != "" {
Expand All @@ -201,6 +207,13 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
}
}

if enableSpanDrops != "" {
cfg.EnableSpanDrops, err = strconv.ParseBool(enableSpanDrops)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_SPAN_DROP: %v", err))
}
}

// test valid log format type
for _, logType := range cfg.LogTypes {
if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) {
Expand Down
2 changes: 1 addition & 1 deletion lambda-extensions/config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// ExtensionName same as binary name or file name where main exists
var ExtensionName = filepath.Base(os.Args[0])
var layerVersion = "4"
var layerVersion = "7"

// SumoLogicExtensionLayerVersionSuffix denotes the layer version published in AWS
var SumoLogicExtensionLayerVersionSuffix string = fmt.Sprintf("%s-prod:%s", ExtensionName, layerVersion)
47 changes: 47 additions & 0 deletions lambda-extensions/lambdaapi/telemetryapiclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package lambdaapi

import (
"bytes"
"context"
"encoding/json"
"fmt"
)

const (
// Base URL for telemetry api extension
telemetryURL = "2022-07-01/telemetry"
// Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
telemetry_timeoutMs = 1000
telemetry_maxBytes = 1048576
telemetry_maxItems = 10000
telemetry_receiverPort = 4243
)

// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs.
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string) ([]byte, error) {
URL := client.baseURL + telemetryURL

reqBody, err := json.Marshal(map[string]interface{}{
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
"types": logEvents,
"buffering": map[string]interface{}{"timeoutMs": telemetry_timeoutMs, "maxBytes": telemetry_maxBytes, "maxItems": telemetry_maxItems},
"schemaVersion": "2022-07-01",
})
if err != nil {
return nil, err
}
headers := map[string]string{
extensionIdentiferHeader: client.extensionID,
}
var response []byte
if ctx != nil {
response, err = client.MakeRequestWithContext(ctx, headers, bytes.NewBuffer(reqBody), "PUT", URL)
} else {
response, err = client.MakeRequest(headers, bytes.NewBuffer(reqBody), "PUT", URL)
}
if err != nil {
return nil, err
}

return response, nil
}
35 changes: 35 additions & 0 deletions lambda-extensions/lambdaapi/telemetryapiclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package lambdaapi

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)

func TestSubscribeToTelemetryAPI(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assertEqual(t, r.Method, http.MethodPut, "Method is not PUT")
assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present")

reqBytes, err := ioutil.ReadAll(r.Body)
assertNoError(t, err, "Received error")
defer r.Body.Close()
assertNotEmpty(t, reqBytes, "Received error in request")

w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
w.WriteHeader(200)
}))

defer srv.Close()
client := NewClient(srv.URL[7:], extensionName)

// Without Context
response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"})
commonAsserts(t, client, response, err)

// With Context
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"})
commonAsserts(t, client, response, err)
}
36 changes: 27 additions & 9 deletions lambda-extensions/sumoclient/sumoclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *sumoLogicClient) failoverHandler(buf *bytes.Buffer) error {
}
err = utils.UploadToS3(&s.config.S3BucketName, &keyName, buf)
if err != nil {
err = fmt.Errorf("Failed to Send to S3 Bucket %s Path %s: %w", s.config.S3BucketName, keyName, err)
err = fmt.Errorf("failed to send to s3 bucket %s path %s: %w", s.config.S3BucketName, keyName, err)
}
return err
}
Expand Down Expand Up @@ -159,13 +159,22 @@ func (s *sumoLogicClient) createCWLogLine(item map[string]interface{}) {

message, ok := item["record"].(map[string]interface{})
if ok {
delete(item, "record")
s.logger.Debug("Not dropping record, if logType is platform.report.")
// delete(item, "record")
}

// Todo convert this to struct
// Updated cwMessageLine to also cover new field initDurationMs as record.metrics do have it.
metric := message["metrics"].(map[string]interface{})
cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB",
message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"])
item["message"] = cwMessageLine
if metric["initDurationMs"] == nil {
cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB",
message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"])
item["message"] = cwMessageLine
} else {
cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB Init Duration: %v ms",
message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"], metric["initDurationMs"])
item["message"] = cwMessageLine
}
}

func (s *sumoLogicClient) getLogGroup() string {
Expand Down Expand Up @@ -210,17 +219,26 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) {
}
} else if ok && logType == "platform.report" {
s.createCWLogLine(item)
} else if ok && logType == "platform.runtimeDone" {
message, ok := item["record"].(map[string]interface{})
if ok {
_, ok := message["spans"]
if ok && s.config.EnableSpanDrops {
// dropping spans if its present and configured to drop
delete(message, "spans")
}
}
}
}
}

func (s *sumoLogicClient) transformBytesToArrayOfMap(rawmsg []byte) (responseBody, error) {
s.logger.Debugln("Transforming bytes to array of maps")
var msg responseBody
var err error
err = json.Unmarshal(rawmsg, &msg)
// var err error
var err error = json.Unmarshal(rawmsg, &msg)
if err != nil {
return msg, fmt.Errorf("Error in parsing payload %s: %v", string(rawmsg), err)
return msg, fmt.Errorf("error in parsing payload %s: %v", string(rawmsg), err)
}
return msg, err
}
Expand Down Expand Up @@ -253,7 +271,7 @@ func (s *sumoLogicClient) createChunks(msgArr responseBody) ([]string, error) {
}
chunks = append(chunks, currentChunk.String())
if errorCount > 0 {
err = fmt.Errorf("Dropping %d messages due to json parsing error", errorCount)
err = fmt.Errorf("dropping %d messages due to json parsing error", errorCount)
}
s.logger.Debugf("Chunks created: %d NumOfParsingError: %d", len(chunks), errorCount)
return chunks, err
Expand Down
9 changes: 5 additions & 4 deletions lambda-extensions/sumologic-extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ func runTimeAPIInit() (int64, error) {
}
logger.Debug("Succcessfully Registered with Run Time API Client: ", utils.PrettyPrint(registerResponse))

// Subscribe to Logs API
logger.Debug("Subscribing Extension to Logs API........")
subscribeResponse, err := extensionClient.SubscribeToLogsAPI(nil, config.LogTypes)
// Subscribe to Telemetry API
logger.Debug("Subscribing Extension to Telemetry API........")
subscribeResponse, err := extensionClient.SubscribeToTelemetryAPI(nil, config.LogTypes)
if err != nil {
return 0, err
}
logger.Debug("Successfully subscribed to Logs API: ", utils.PrettyPrint(string(subscribeResponse)))

logger.Debug("Successfully subscribed to Telemetry API: ", utils.PrettyPrint(string(subscribeResponse)))

// Call next to say registration is successful and get the deadtimems
nextResponse, err := nextEvent(nil)
Expand Down