diff --git a/README.md b/README.md index 473492a..c78fcef 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Releasing new layer versions `sh zip.sh` -- The new wheel package gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases). +- The new extension binary and zip files gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases). Run below commands to create and push tags diff --git a/lambda-extensions/config/config.go b/lambda-extensions/config/config.go index 53fc596..b087641 100644 --- a/lambda-extensions/config/config.go +++ b/lambda-extensions/config/config.go @@ -35,6 +35,7 @@ type LambdaExtensionConfig struct { LambdaRegion string SourceCategoryOverride string EnhanceJsonLogs bool + EnableSpanDrops bool } var defaultLogTypes = []string{"platform", "function"} @@ -66,6 +67,7 @@ func GetConfig() (*LambdaExtensionConfig, error) { } return config, nil } + func (cfg *LambdaExtensionConfig) setDefaults() { numRetry := os.Getenv("SUMO_NUM_RETRIES") retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS") @@ -75,6 +77,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() { enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER") logTypes := os.Getenv("SUMO_LOG_TYPES") enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") + enableSpanDrops := os.Getenv("SUMO_SPAN_DROP") if numRetry == "" { cfg.NumRetry = 3 @@ -88,7 +91,6 @@ func (cfg *LambdaExtensionConfig) setDefaults() { if maxConcurrentRequests == "" { cfg.MaxConcurrentRequests = 3 } - if enableFailover == "" { cfg.EnableFailover = false } @@ -106,6 +108,10 @@ func (cfg *LambdaExtensionConfig) setDefaults() { if enhanceJsonLogs == "" { cfg.EnhanceJsonLogs = true } + if enableSpanDrops == "" { + // by default, spans will not be dropped if user did not configure the env variable + cfg.EnableSpanDrops = false + } } func (cfg *LambdaExtensionConfig) validateConfig() error { @@ -116,6 +122,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER") retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS") enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") + enableSpanDrops := os.Getenv("SUMO_SPAN_DROP") var allErrors []string var err error @@ -139,7 +146,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } } - if cfg.EnableFailover == true { + if cfg.EnableFailover { if cfg.S3BucketName == "" { allErrors = append(allErrors, "SUMO_S3_BUCKET_NAME not set in environment variable") } @@ -173,8 +180,8 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } else { cfg.MaxDataQueueLength = int(customMaxDataQueueLength) } - } + if maxConcurrentRequests != "" { customMaxConcurrentRequests, err := strconv.ParseInt(maxConcurrentRequests, 10, 32) if err != nil { @@ -182,8 +189,8 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } else { cfg.MaxConcurrentRequests = int(customMaxConcurrentRequests) } - } + if logLevel != "" { customloglevel, err := logrus.ParseLevel(logLevel) if err != nil { @@ -191,7 +198,6 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } else { cfg.LogLevel = customloglevel } - } if enhanceJsonLogs != "" { @@ -201,6 +207,13 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } } + if enableSpanDrops != "" { + cfg.EnableSpanDrops, err = strconv.ParseBool(enableSpanDrops) + if err != nil { + allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_SPAN_DROP: %v", err)) + } + } + // test valid log format type for _, logType := range cfg.LogTypes { if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) { diff --git a/lambda-extensions/config/version.go b/lambda-extensions/config/version.go index da98125..e9fe1ff 100644 --- a/lambda-extensions/config/version.go +++ b/lambda-extensions/config/version.go @@ -8,7 +8,7 @@ import ( // ExtensionName same as binary name or file name where main exists var ExtensionName = filepath.Base(os.Args[0]) -var layerVersion = "4" +var layerVersion = "7" // SumoLogicExtensionLayerVersionSuffix denotes the layer version published in AWS var SumoLogicExtensionLayerVersionSuffix string = fmt.Sprintf("%s-prod:%s", ExtensionName, layerVersion) diff --git a/lambda-extensions/lambdaapi/telemetryapiclient.go b/lambda-extensions/lambdaapi/telemetryapiclient.go new file mode 100644 index 0000000..f59d74b --- /dev/null +++ b/lambda-extensions/lambdaapi/telemetryapiclient.go @@ -0,0 +1,47 @@ +package lambdaapi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" +) + +const ( + // Base URL for telemetry api extension + telemetryURL = "2022-07-01/telemetry" + // Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol. + telemetry_timeoutMs = 1000 + telemetry_maxBytes = 1048576 + telemetry_maxItems = 10000 + telemetry_receiverPort = 4243 +) + +// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs. +func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string) ([]byte, error) { + URL := client.baseURL + telemetryURL + + reqBody, err := json.Marshal(map[string]interface{}{ + "destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)}, + "types": logEvents, + "buffering": map[string]interface{}{"timeoutMs": telemetry_timeoutMs, "maxBytes": telemetry_maxBytes, "maxItems": telemetry_maxItems}, + "schemaVersion": "2022-07-01", + }) + if err != nil { + return nil, err + } + headers := map[string]string{ + extensionIdentiferHeader: client.extensionID, + } + var response []byte + if ctx != nil { + response, err = client.MakeRequestWithContext(ctx, headers, bytes.NewBuffer(reqBody), "PUT", URL) + } else { + response, err = client.MakeRequest(headers, bytes.NewBuffer(reqBody), "PUT", URL) + } + if err != nil { + return nil, err + } + + return response, nil +} diff --git a/lambda-extensions/lambdaapi/telemetryapiclient_test.go b/lambda-extensions/lambdaapi/telemetryapiclient_test.go new file mode 100644 index 0000000..54e254c --- /dev/null +++ b/lambda-extensions/lambdaapi/telemetryapiclient_test.go @@ -0,0 +1,35 @@ +package lambdaapi + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" +) + +func TestSubscribeToTelemetryAPI(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assertEqual(t, r.Method, http.MethodPut, "Method is not PUT") + assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present") + + reqBytes, err := ioutil.ReadAll(r.Body) + assertNoError(t, err, "Received error") + defer r.Body.Close() + assertNotEmpty(t, reqBytes, "Received error in request") + + w.Header().Add(extensionIdentiferHeader, "test-sumo-id") + w.WriteHeader(200) + })) + + defer srv.Close() + client := NewClient(srv.URL[7:], extensionName) + + // Without Context + response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"}) + commonAsserts(t, client, response, err) + + // With Context + response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"}) + commonAsserts(t, client, response, err) +} diff --git a/lambda-extensions/sumoclient/sumoclient.go b/lambda-extensions/sumoclient/sumoclient.go index b08b6dd..781d6f5 100644 --- a/lambda-extensions/sumoclient/sumoclient.go +++ b/lambda-extensions/sumoclient/sumoclient.go @@ -101,7 +101,7 @@ func (s *sumoLogicClient) failoverHandler(buf *bytes.Buffer) error { } err = utils.UploadToS3(&s.config.S3BucketName, &keyName, buf) if err != nil { - err = fmt.Errorf("Failed to Send to S3 Bucket %s Path %s: %w", s.config.S3BucketName, keyName, err) + err = fmt.Errorf("failed to send to s3 bucket %s path %s: %w", s.config.S3BucketName, keyName, err) } return err } @@ -159,13 +159,22 @@ func (s *sumoLogicClient) createCWLogLine(item map[string]interface{}) { message, ok := item["record"].(map[string]interface{}) if ok { - delete(item, "record") + s.logger.Debug("Not dropping record, if logType is platform.report.") + // delete(item, "record") } + // Todo convert this to struct + // Updated cwMessageLine to also cover new field initDurationMs as record.metrics do have it. metric := message["metrics"].(map[string]interface{}) - cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB", - message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"]) - item["message"] = cwMessageLine + if metric["initDurationMs"] == nil { + cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB", + message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"]) + item["message"] = cwMessageLine + } else { + cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB Init Duration: %v ms", + message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"], metric["initDurationMs"]) + item["message"] = cwMessageLine + } } func (s *sumoLogicClient) getLogGroup() string { @@ -210,6 +219,15 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) { } } else if ok && logType == "platform.report" { s.createCWLogLine(item) + } else if ok && logType == "platform.runtimeDone" { + message, ok := item["record"].(map[string]interface{}) + if ok { + _, ok := message["spans"] + if ok && s.config.EnableSpanDrops { + // dropping spans if its present and configured to drop + delete(message, "spans") + } + } } } } @@ -217,10 +235,10 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) { func (s *sumoLogicClient) transformBytesToArrayOfMap(rawmsg []byte) (responseBody, error) { s.logger.Debugln("Transforming bytes to array of maps") var msg responseBody - var err error - err = json.Unmarshal(rawmsg, &msg) + // var err error + var err error = json.Unmarshal(rawmsg, &msg) if err != nil { - return msg, fmt.Errorf("Error in parsing payload %s: %v", string(rawmsg), err) + return msg, fmt.Errorf("error in parsing payload %s: %v", string(rawmsg), err) } return msg, err } @@ -253,7 +271,7 @@ func (s *sumoLogicClient) createChunks(msgArr responseBody) ([]string, error) { } chunks = append(chunks, currentChunk.String()) if errorCount > 0 { - err = fmt.Errorf("Dropping %d messages due to json parsing error", errorCount) + err = fmt.Errorf("dropping %d messages due to json parsing error", errorCount) } s.logger.Debugf("Chunks created: %d NumOfParsingError: %d", len(chunks), errorCount) return chunks, err diff --git a/lambda-extensions/sumologic-extension.go b/lambda-extensions/sumologic-extension.go index 1bdf866..5fca432 100644 --- a/lambda-extensions/sumologic-extension.go +++ b/lambda-extensions/sumologic-extension.go @@ -64,13 +64,14 @@ func runTimeAPIInit() (int64, error) { } logger.Debug("Succcessfully Registered with Run Time API Client: ", utils.PrettyPrint(registerResponse)) - // Subscribe to Logs API - logger.Debug("Subscribing Extension to Logs API........") - subscribeResponse, err := extensionClient.SubscribeToLogsAPI(nil, config.LogTypes) + // Subscribe to Telemetry API + logger.Debug("Subscribing Extension to Telemetry API........") + subscribeResponse, err := extensionClient.SubscribeToTelemetryAPI(nil, config.LogTypes) if err != nil { return 0, err } - logger.Debug("Successfully subscribed to Logs API: ", utils.PrettyPrint(string(subscribeResponse))) + + logger.Debug("Successfully subscribed to Telemetry API: ", utils.PrettyPrint(string(subscribeResponse))) // Call next to say registration is successful and get the deadtimems nextResponse, err := nextEvent(nil)