diff --git a/exporter/collector/go.mod b/exporter/collector/go.mod index 4503b2248..176d0db2c 100644 --- a/exporter/collector/go.mod +++ b/exporter/collector/go.mod @@ -25,6 +25,7 @@ require ( go.uber.org/zap v1.23.0 golang.org/x/oauth2 v0.8.0 google.golang.org/api v0.126.0 + google.golang.org/genproto v0.0.0-20230731193218-e0aa005b6bdf google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 @@ -61,7 +62,6 @@ require ( golang.org/x/sys v0.9.0 // indirect golang.org/x/text v0.10.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230731193218-e0aa005b6bdf // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/exporter/collector/integrationtest/go.sum b/exporter/collector/integrationtest/go.sum index c6f850fc6..c08e3c99a 100644 --- a/exporter/collector/integrationtest/go.sum +++ b/exporter/collector/integrationtest/go.sum @@ -27,7 +27,6 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= -cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y= cloud.google.com/go/logging v1.7.0 h1:CJYxlNNNNAMkHp9em/YEXcfJg+rPDg7YfwoRpMU+t5I= cloud.google.com/go/logging v1.7.0/go.mod h1:3xjP2CjkM3ZkO73aj4ASA5wRPGGCRrPIAeNqVNkzY8M= cloud.google.com/go/longrunning v0.5.1 h1:Fr7TXftcqTudoyRJa113hyaqlGdiBQkp0Gq7tErFDWI= diff --git a/exporter/collector/logs.go b/exporter/collector/logs.go index 9d574606c..a4e8d848c 100644 --- a/exporter/collector/logs.go +++ b/exporter/collector/logs.go @@ -15,27 +15,31 @@ package collector import ( + "bytes" "context" "encoding/hex" "encoding/json" "fmt" "math" - "net/http" "net/url" "strings" "time" + "unicode/utf8" - "cloud.google.com/go/logging" loggingv2 "cloud.google.com/go/logging/apiv2" logpb "cloud.google.com/go/logging/apiv2/loggingpb" "github.com/googleapis/gax-go/v2" "go.uber.org/multierr" "go.uber.org/zap" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + logtypepb "google.golang.org/genproto/googleapis/logging/type" "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -59,32 +63,32 @@ const ( // severityMapping maps the integer severity level values from OTel [0-24] // to matching Cloud Logging severity levels. -var severityMapping = []logging.Severity{ - logging.Default, // Default, 0 - logging.Debug, // - logging.Debug, // - logging.Debug, // - logging.Debug, // - logging.Debug, // - logging.Debug, // - logging.Debug, // - logging.Debug, // 1-8 -> Debug - logging.Info, // - logging.Info, // 9-10 -> Info - logging.Notice, // - logging.Notice, // 11-12 -> Notice - logging.Warning, // - logging.Warning, // - logging.Warning, // - logging.Warning, // 13-16 -> Warning - logging.Error, // - logging.Error, // - logging.Error, // - logging.Error, // 17-20 -> Error - logging.Critical, // - logging.Critical, // 21-22 -> Critical - logging.Alert, // 23 -> Alert - logging.Emergency, // 24 -> Emergency +var severityMapping = []logtypepb.LogSeverity{ + logtypepb.LogSeverity_DEFAULT, // Default, 0 + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // + logtypepb.LogSeverity_DEBUG, // 1-8 -> Debug + logtypepb.LogSeverity_INFO, // + logtypepb.LogSeverity_INFO, // 9-10 -> Info + logtypepb.LogSeverity_NOTICE, // + logtypepb.LogSeverity_NOTICE, // 11-12 -> Notice + logtypepb.LogSeverity_WARNING, // + logtypepb.LogSeverity_WARNING, // + logtypepb.LogSeverity_WARNING, // + logtypepb.LogSeverity_WARNING, // 13-16 -> Warning + logtypepb.LogSeverity_ERROR, // + logtypepb.LogSeverity_ERROR, // + logtypepb.LogSeverity_ERROR, // + logtypepb.LogSeverity_ERROR, // 17-20 -> Error + logtypepb.LogSeverity_CRITICAL, // + logtypepb.LogSeverity_CRITICAL, // 21-22 -> Critical + logtypepb.LogSeverity_ALERT, // 23 -> Alert + logtypepb.LogSeverity_EMERGENCY, // 24 -> Emergency } // otelSeverityForText maps the generic aliases of SeverityTexts to SeverityNumbers. @@ -292,19 +296,14 @@ func (l logMapper) createEntries(ld plog.Logs) (map[string][]*logpb.LogEntry, er continue } - for splitIndex, entry := range splitEntries { - internalLogEntry, err := l.logEntryToInternal(entry, logName, projectID, mr, len(splitEntries), splitIndex) - if err != nil { - errors = append(errors, err) - continue - } + for _, entry := range splitEntries { if l.cfg.DestinationProjectQuota { projectMapKey = projectID } if _, ok := entries[projectMapKey]; !ok { entries[projectMapKey] = make([]*logpb.LogEntry, 0) } - entries[projectMapKey] = append(entries[projectMapKey], internalLogEntry) + entries[projectMapKey] = append(entries[projectMapKey], entry) } } } @@ -325,31 +324,6 @@ func mergeLogLabels(instrumentationSource, instrumentationVersion string, resour return mergeLabels(labelsMap, resourceLabels) } -func (l logMapper) logEntryToInternal( - entry logging.Entry, - logName string, - projectID string, - mr *monitoredrespb.MonitoredResource, - splits int, - splitIndex int, -) (*logpb.LogEntry, error) { - internalLogEntry, err := logging.ToLogEntry(entry, fmt.Sprintf("projects/%s", projectID)) - if err != nil { - return nil, err - } - - internalLogEntry.LogName = fmt.Sprintf("projects/%s/logs/%s", projectID, url.PathEscape(logName)) - internalLogEntry.Resource = mr - if splits > 1 { - internalLogEntry.Split = &logpb.LogSplit{ - Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.String()), - Index: int32(splitIndex), - TotalSplits: int32(splits), - } - } - return internalLogEntry, nil -} - func (l *LogsExporter) writeLogEntries(ctx context.Context, batch []*logpb.LogEntry) (*logpb.WriteLogEntriesResponse, error) { request := &logpb.WriteLogEntriesRequest{ PartialSuccess: true, @@ -378,27 +352,30 @@ func (l logMapper) logToSplitEntries( processTime time.Time, logName string, projectID string, -) ([]logging.Entry, error) { +) ([]*logpb.LogEntry, error) { // make a copy in case we mutate the record logRecord := plog.NewLogRecord() log.CopyTo(logRecord) - entry := logging.Entry{ - Resource: mr, - } - - entry.Timestamp = logRecord.Timestamp().AsTime() - if logRecord.Timestamp() == 0 { + ts := logRecord.Timestamp().AsTime() + if logRecord.Timestamp() == 0 || ts.IsZero() { // if timestamp is unset, fall back to observed_time_unix_nano as recommended // (see https://github.com/open-telemetry/opentelemetry-proto/blob/4abbb78/opentelemetry/proto/logs/v1/logs.proto#L176-L179) if logRecord.ObservedTimestamp() != 0 { - entry.Timestamp = logRecord.ObservedTimestamp().AsTime() + ts = logRecord.ObservedTimestamp().AsTime() } else { // if observed_time is 0, use the current time - entry.Timestamp = processTime + ts = processTime } } + entry := &logpb.LogEntry{ + Resource: mr, + Timestamp: timestamppb.New(ts), + Labels: logLabels, + LogName: fmt.Sprintf("projects/%s/logs/%s", projectID, url.PathEscape(logName)), + } + // build our own map off OTel attributes so we don't have to call .Get() for each special case // (.Get() ranges over all attributes each time) attrsMap := make(map[string]pcommon.Value) @@ -412,7 +389,7 @@ func (l logMapper) logToSplitEntries( var logEntrySourceLocation logpb.LogEntrySourceLocation err := json.Unmarshal(sourceLocation.Bytes().AsRaw(), &logEntrySourceLocation) if err != nil { - return []logging.Entry{entry}, err + return nil, err } entry.SourceLocation = &logEntrySourceLocation delete(attrsMap, SourceLocationAttributeKey) @@ -429,21 +406,20 @@ func (l logMapper) logToSplitEntries( entry.Trace = fmt.Sprintf("projects/%s/traces/%s", projectID, hex.EncodeToString(traceID[:])) } if spanID := logRecord.SpanID(); !spanID.IsEmpty() { - entry.SpanID = hex.EncodeToString(spanID[:]) + entry.SpanId = hex.EncodeToString(spanID[:]) } if httpRequestAttr, ok := attrsMap[HTTPRequestAttributeKey]; ok { - var httpRequest *logging.HTTPRequest httpRequest, err := l.parseHTTPRequest(httpRequestAttr) if err != nil { l.obs.log.Debug("Unable to parse httpRequest", zap.Error(err)) } - entry.HTTPRequest = httpRequest + entry.HttpRequest = httpRequest delete(attrsMap, HTTPRequestAttributeKey) } if logRecord.SeverityNumber() < 0 || int(logRecord.SeverityNumber()) > len(severityMapping)-1 { - return []logging.Entry{entry}, fmt.Errorf("unknown SeverityNumber %v", logRecord.SeverityNumber()) + return nil, fmt.Errorf("unknown SeverityNumber %v", logRecord.SeverityNumber()) } severityNumber := logRecord.SeverityNumber() // Log severity levels are based on numerical values defined by Otel/GCP, which are informally mapped to generic text values such as "ALERT", "Debug", etc. @@ -468,47 +444,52 @@ func (l logMapper) logToSplitEntries( logRecord.Body().Map().PutStr(GCPTypeKey, GCPErrorReportingTypeValue) } - entry.Labels = logLabels - // parse remaining OTel attributes to GCP labels for k, v := range attrsMap { // skip "gcp.*" attributes since we process these to other fields if strings.HasPrefix(k, "gcp.") { continue } - if entry.Labels == nil { - entry.Labels = make(map[string]string) - } if _, ok := entry.Labels[k]; !ok { entry.Labels[k] = v.AsString() } } - // Calculate the size of the internal log entry so this overhead can be accounted - // for when determining the need to split based on payload size - // TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well - logOverhead, err := l.logEntryToInternal(entry, logName, projectID, mr, 0, 0) - if err != nil { - return []logging.Entry{entry}, err - } - // make a copy so the proto initialization doesn't modify the original entry - overheadClone := proto.Clone(logOverhead) - overheadBytes := proto.Size(overheadClone) - - payload, splits, err := parseEntryPayload(logRecord.Body(), l.maxEntrySize-overheadBytes) - if err != nil { - return []logging.Entry{entry}, err + if len(logRecord.Body().AsString()) == 0 { + return []*logpb.LogEntry{entry}, nil } - // Split log entries with a string payload into fewer entries - if splits > 1 { - entries := make([]logging.Entry, splits) - payloadString := payload.(string) - + switch logRecord.Body().Type() { + case pcommon.ValueTypeBytes: + s, err := toProtoStruct(logRecord.Body().Bytes().AsRaw()) + if err != nil { + return nil, err + } + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} + case pcommon.ValueTypeMap: + s, err := structpb.NewStruct(logRecord.Body().Map().AsRaw()) + if err != nil { + return nil, err + } + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} + case pcommon.ValueTypeStr: + // Calculate the size of the internal log entry so this overhead can be accounted + // for when determining the need to split based on payload size + // TODO(damemi): Find an appropriate estimated buffer to account for the LogSplit struct as well + overheadBytes := proto.Size(entry) + // Split log entries with a string payload into fewer entries + payloadString := logRecord.Body().Str() + splits := int(math.Ceil(float64(len([]byte(payloadString))) / float64(l.maxEntrySize-overheadBytes))) + if splits <= 1 { + entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: payloadString} + return []*logpb.LogEntry{entry}, nil + } + entries := make([]*logpb.LogEntry, splits) // Start by assuming all splits will be even (this may not be the case) startIndex := 0 endIndex := int(math.Floor((1.0 / float64(splits)) * float64(len(payloadString)))) - for i := 1; i <= splits; i++ { + for i := 0; i < splits; i++ { + newEntry := proto.Clone(entry).(*logpb.LogEntry) currentSplit := payloadString[startIndex:endIndex] // If the current split is larger than the entry size, iterate until it is within the max @@ -517,35 +498,23 @@ func (l logMapper) logToSplitEntries( endIndex-- currentSplit = payloadString[startIndex:endIndex] } - entries[i-1] = entry - entries[i-1].Payload = currentSplit + newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: currentSplit} + newEntry.Split = &logpb.LogSplit{ + Uid: fmt.Sprintf("%s-%s", logName, entry.Timestamp.AsTime().String()), + Index: int32(i), + TotalSplits: int32(splits), + } + entries[i] = newEntry // Update slice indices to the next chunk startIndex = endIndex - endIndex = int(math.Floor((float64(i+1) / float64(splits)) * float64(len(payloadString)))) + endIndex = int(math.Floor((float64(i+2) / float64(splits)) * float64(len(payloadString)))) } return entries, nil - } - - entry.Payload = payload - return []logging.Entry{entry}, nil -} - -func parseEntryPayload(logBody pcommon.Value, maxEntrySize int) (interface{}, int, error) { - if len(logBody.AsString()) == 0 { - return nil, 0, nil - } - switch logBody.Type() { - case pcommon.ValueTypeBytes: - return logBody.Bytes().AsRaw(), 1, nil - case pcommon.ValueTypeStr: - return logBody.AsString(), int(math.Ceil(float64(len([]byte(logBody.AsString()))) / float64(maxEntrySize))), nil - case pcommon.ValueTypeMap: - return logBody.Map().AsRaw(), 1, nil - default: - return nil, 0, fmt.Errorf("unknown log body value %v", logBody.Type().String()) + return nil, fmt.Errorf("unknown log body value %v", logRecord.Body().Type().String()) } + return []*logpb.LogEntry{entry}, nil } // JSON keys derived from: @@ -562,54 +531,93 @@ type httpRequestLog struct { ResponseSize int64 `json:"responseSize,string"` RequestSize int64 `json:"requestSize,string"` CacheFillBytes int64 `json:"cacheFillBytes,string"` - Status int `json:"status,string"` + Status int32 `json:"status,string"` CacheLookup bool `json:"cacheLookup"` CacheHit bool `json:"cacheHit"` CacheValidatedWithOriginServer bool `json:"cacheValidatedWithOriginServer"` } -func (l logMapper) parseHTTPRequest(httpRequestAttr pcommon.Value) (*logging.HTTPRequest, error) { - var bytes []byte +func (l logMapper) parseHTTPRequest(httpRequestAttr pcommon.Value) (*logtypepb.HttpRequest, error) { + var httpBytes []byte switch httpRequestAttr.Type() { case pcommon.ValueTypeBytes: - bytes = httpRequestAttr.Bytes().AsRaw() + httpBytes = httpRequestAttr.Bytes().AsRaw() case pcommon.ValueTypeStr, pcommon.ValueTypeMap: - bytes = []byte(httpRequestAttr.AsString()) + httpBytes = []byte(httpRequestAttr.AsString()) } // TODO: Investigate doing this without the JSON unmarshal. Getting the attribute as a map // instead of a slice of bytes could do, but would need a lot of type casting and checking // assertions with it. var parsedHTTPRequest httpRequestLog - if err := json.Unmarshal(bytes, &parsedHTTPRequest); err != nil { + if err := json.Unmarshal(httpBytes, &parsedHTTPRequest); err != nil { return nil, err } - req, err := http.NewRequest(parsedHTTPRequest.RequestMethod, parsedHTTPRequest.RequestURL, nil) - if err != nil { - return nil, err - } - req.Header.Set("Referer", parsedHTTPRequest.Referer) - req.Header.Set("User-Agent", parsedHTTPRequest.UserAgent) - - httpRequest := &logging.HTTPRequest{ - Request: req, + pb := &logtypepb.HttpRequest{ + RequestMethod: parsedHTTPRequest.RequestMethod, + RequestUrl: fixUTF8(parsedHTTPRequest.RequestURL), RequestSize: parsedHTTPRequest.RequestSize, Status: parsedHTTPRequest.Status, ResponseSize: parsedHTTPRequest.ResponseSize, - LocalIP: parsedHTTPRequest.ServerIP, - RemoteIP: parsedHTTPRequest.RemoteIP, + UserAgent: parsedHTTPRequest.UserAgent, + ServerIp: parsedHTTPRequest.ServerIP, + RemoteIp: parsedHTTPRequest.RemoteIP, + Referer: parsedHTTPRequest.Referer, CacheHit: parsedHTTPRequest.CacheHit, CacheValidatedWithOriginServer: parsedHTTPRequest.CacheValidatedWithOriginServer, + Protocol: "HTTP/1.1", CacheFillBytes: parsedHTTPRequest.CacheFillBytes, CacheLookup: parsedHTTPRequest.CacheLookup, } if parsedHTTPRequest.Latency != "" { latency, err := time.ParseDuration(parsedHTTPRequest.Latency) - if err == nil { - httpRequest.Latency = latency + if err == nil && latency != 0 { + pb.Latency = durationpb.New(latency) } } + return pb, nil +} - return httpRequest, nil +// toProtoStruct converts v, which must marshal into a JSON object, +// into a Google Struct proto. +// Mostly copied from +// https://github.com/googleapis/google-cloud-go/blob/69705144832c715cf23832602ad9338b911dff9a/logging/logging.go#L577 +func toProtoStruct(v any) (*structpb.Struct, error) { + // v is a Go value that supports JSON marshaling. We want a Struct + // protobuf. Some day we may have a more direct way to get there, but right + // now the only way is to marshal the Go value to JSON, unmarshal into a + // map, and then build the Struct proto from the map. + jb, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("logging: json.Marshal: %w", err) + } + var m map[string]any + err = json.Unmarshal(jb, &m) + if err != nil { + return nil, fmt.Errorf("logging: json.Unmarshal: %w", err) + } + return structpb.NewStruct(m) +} + +// fixUTF8 is a helper that fixes an invalid UTF-8 string by replacing +// invalid UTF-8 runes with the Unicode replacement character (U+FFFD). +// See Issue https://github.com/googleapis/google-cloud-go/issues/1383. +// Coped from https://github.com/googleapis/google-cloud-go/blob/69705144832c715cf23832602ad9338b911dff9a/logging/logging.go#L557 +func fixUTF8(s string) string { + if utf8.ValidString(s) { + return s + } + + // Otherwise time to build the sequence. + buf := new(bytes.Buffer) + buf.Grow(len(s)) + for _, r := range s { + if utf8.ValidRune(r) { + buf.WriteRune(r) + } else { + buf.WriteRune('\uFFFD') + } + } + return buf.String() } diff --git a/exporter/collector/logs_test.go b/exporter/collector/logs_test.go index 0fa113754..fbc4735fc 100644 --- a/exporter/collector/logs_test.go +++ b/exporter/collector/logs_test.go @@ -17,8 +17,6 @@ package collector import ( "encoding/hex" "fmt" - "io" - "net/http" "testing" "time" @@ -29,6 +27,10 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + logtypepb "google.golang.org/genproto/googleapis/logging/type" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" ) type Option func(*Config) @@ -58,13 +60,14 @@ func TestLogMapping(t *testing.T) { testSpanID := pcommon.SpanID([8]byte{ 0, 0, 0, 0, 0, 0, 0, 1, }) + logName := "projects/fakeprojectid/logs/default-log" testCases := []struct { log func() plog.LogRecord mr func() *monitoredrespb.MonitoredResource config Option name string - expectedEntries []logging.Entry + expectedEntries []*logpb.LogEntry maxEntrySize int expectError bool }{ @@ -76,14 +79,26 @@ func TestLogMapping(t *testing.T) { log.Body().SetStr("abcxyz") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: "abc", - Timestamp: testObservedTime, + LogName: logName, + Payload: &logpb.LogEntry_TextPayload{TextPayload: "abc"}, + Timestamp: timestamppb.New(testObservedTime), + Split: &logpb.LogSplit{ + Uid: fmt.Sprintf("default-log-%s", testObservedTime.String()), + Index: 0, + TotalSplits: 2, + }, }, { - Payload: "xyz", - Timestamp: testObservedTime, + LogName: logName, + Payload: &logpb.LogEntry_TextPayload{TextPayload: "xyz"}, + Timestamp: timestamppb.New(testObservedTime), + Split: &logpb.LogSplit{ + Uid: fmt.Sprintf("default-log-%s", testObservedTime.String()), + Index: 1, + TotalSplits: 2, + }, }, }, mr: func() *monitoredrespb.MonitoredResource { @@ -98,8 +113,9 @@ func TestLogMapping(t *testing.T) { mr: func() *monitoredrespb.MonitoredResource { return nil }, - expectedEntries: []logging.Entry{{ - Timestamp: testObservedTime, + expectedEntries: []*logpb.LogEntry{{ + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), }}, maxEntrySize: defaultMaxEntrySize, }, @@ -107,16 +123,19 @@ func TestLogMapping(t *testing.T) { name: "log with json, empty monitoredresource", log: func() plog.LogRecord { log := plog.NewLogRecord() - log.Body().SetEmptyBytes().FromRaw([]byte(`{"this": "is json"}`)) + log.Body().SetEmptyMap().PutStr("this", "is json") return log }, mr: func() *monitoredrespb.MonitoredResource { return nil }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: []byte(`{"this": "is json"}`), - Timestamp: testObservedTime, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Payload: &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: map[string]*structpb.Value{ + "this": {Kind: &structpb.Value_StringValue{StringValue: "is json"}}, + }}}, }, }, maxEntrySize: defaultMaxEntrySize, @@ -125,7 +144,7 @@ func TestLogMapping(t *testing.T) { name: "log with json and httpRequest, empty monitoredresource", log: func() plog.LogRecord { log := plog.NewLogRecord() - log.Body().SetEmptyBytes().FromRaw([]byte(`{"message": "hello!"}`)) + log.Body().SetEmptyMap().PutStr("message", "hello!") log.Attributes().PutEmptyBytes(HTTPRequestAttributeKey).FromRaw([]byte(`{ "requestMethod": "GET", "requestURL": "https://www.example.com", @@ -146,17 +165,24 @@ func TestLogMapping(t *testing.T) { mr: func() *monitoredrespb.MonitoredResource { return nil }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: []byte(`{"message": "hello!"}`), - Timestamp: testObservedTime, - HTTPRequest: &logging.HTTPRequest{ - Request: makeExpectedHTTPReq("GET", "https://www.example.com", "https://www.example2.com", "test", nil), + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Payload: &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: map[string]*structpb.Value{ + "message": {Kind: &structpb.Value_StringValue{StringValue: "hello!"}}, + }}}, + HttpRequest: &logtypepb.HttpRequest{ + RequestMethod: "GET", + UserAgent: "test", + Referer: "https://www.example2.com", + RequestUrl: "https://www.example.com", + Protocol: "HTTP/1.1", RequestSize: 1, Status: 200, ResponseSize: 1, - LocalIP: "192.168.0.2", - RemoteIP: "192.168.0.1", + ServerIp: "192.168.0.2", + RemoteIp: "192.168.0.1", CacheHit: false, CacheValidatedWithOriginServer: false, CacheFillBytes: 1, @@ -176,9 +202,10 @@ func TestLogMapping(t *testing.T) { mr: func() *monitoredrespb.MonitoredResource { return nil }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Timestamp: testSampleTime, + LogName: logName, + Timestamp: timestamppb.New(testSampleTime), }, }, maxEntrySize: defaultMaxEntrySize, @@ -193,9 +220,10 @@ func TestLogMapping(t *testing.T) { mr: func() *monitoredrespb.MonitoredResource { return nil }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Timestamp: testSampleTime, + LogName: logName, + Timestamp: timestamppb.New(testSampleTime), }, }, maxEntrySize: defaultMaxEntrySize, @@ -210,10 +238,11 @@ func TestLogMapping(t *testing.T) { log.Body().SetStr("{\"message\": \"hello!\"}") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: `{"message": "hello!"}`, - Timestamp: testObservedTime, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Payload: &logpb.LogEntry_TextPayload{TextPayload: `{"message": "hello!"}`}, }, }, maxEntrySize: defaultMaxEntrySize, @@ -229,14 +258,15 @@ func TestLogMapping(t *testing.T) { log.Body().SetStr("{\"message\": \"hello!\"}") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: map[string]interface{}{ - GCPTypeKey: GCPErrorReportingTypeValue, - "message": `{"message": "hello!"}`, - }, - Timestamp: testObservedTime, - Severity: logging.Error, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Severity: logtypepb.LogSeverity(logging.Error), + Payload: &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: map[string]*structpb.Value{ + GCPTypeKey: {Kind: &structpb.Value_StringValue{StringValue: GCPErrorReportingTypeValue}}, + "message": {Kind: &structpb.Value_StringValue{StringValue: `{"message": "hello!"}`}}, + }}}, }, }, maxEntrySize: defaultMaxEntrySize, @@ -255,14 +285,15 @@ func TestLogMapping(t *testing.T) { log.Body().SetStr("test string message") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: map[string]interface{}{ - GCPTypeKey: GCPErrorReportingTypeValue, - "message": "test string message", - }, - Timestamp: testObservedTime, - Severity: logging.Error, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Severity: logtypepb.LogSeverity(logging.Error), + Payload: &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: map[string]*structpb.Value{ + GCPTypeKey: {Kind: &structpb.Value_StringValue{StringValue: GCPErrorReportingTypeValue}}, + "message": {Kind: &structpb.Value_StringValue{StringValue: "test string message"}}, + }}}, }, }, maxEntrySize: defaultMaxEntrySize, @@ -281,11 +312,12 @@ func TestLogMapping(t *testing.T) { log.Body().SetStr("test string message") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: "test string message", - Timestamp: testObservedTime, - Severity: logging.Warning, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Severity: logtypepb.LogSeverity(logging.Warning), + Payload: &logpb.LogEntry_TextPayload{TextPayload: "test string message"}, }, }, maxEntrySize: defaultMaxEntrySize, @@ -304,14 +336,15 @@ func TestLogMapping(t *testing.T) { log.Body().SetEmptyMap().PutStr("msg", "test map value") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Payload: map[string]interface{}{ - GCPTypeKey: GCPErrorReportingTypeValue, - "msg": "test map value", - }, - Timestamp: testObservedTime, - Severity: logging.Error, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Severity: logtypepb.LogSeverity(logging.Error), + Payload: &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: map[string]*structpb.Value{ + GCPTypeKey: {Kind: &structpb.Value_StringValue{StringValue: GCPErrorReportingTypeValue}}, + "msg": {Kind: &structpb.Value_StringValue{StringValue: "test map value"}}, + }}}, }, }, maxEntrySize: defaultMaxEntrySize, @@ -332,14 +365,15 @@ func TestLogMapping(t *testing.T) { ) return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), SourceLocation: &logpb.LogEntrySourceLocation{ File: "test.php", Line: 100, Function: "helloWorld", }, - Timestamp: testObservedTime, }, }, maxEntrySize: defaultMaxEntrySize, @@ -354,28 +388,11 @@ func TestLogMapping(t *testing.T) { log.Attributes().PutBool(TraceSampledAttributeKey, true) return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), TraceSampled: true, - Timestamp: testObservedTime, - }, - }, - maxEntrySize: defaultMaxEntrySize, - }, - { - name: "log with IsSampled", - mr: func() *monitoredrespb.MonitoredResource { - return nil - }, - log: func() plog.LogRecord { - log := plog.NewLogRecord() - log.SetFlags(log.Flags().WithIsSampled(true)) - return log - }, - expectedEntries: []logging.Entry{ - { - TraceSampled: true, - Timestamp: testObservedTime, }, }, maxEntrySize: defaultMaxEntrySize, @@ -391,11 +408,12 @@ func TestLogMapping(t *testing.T) { log.SetSpanID(testSpanID) return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), Trace: fmt.Sprintf("projects/fakeprojectid/traces/%s", hex.EncodeToString(testTraceID[:])), - SpanID: hex.EncodeToString(testSpanID[:]), - Timestamp: testObservedTime, + SpanId: hex.EncodeToString(testSpanID[:]), }, }, maxEntrySize: defaultMaxEntrySize, @@ -410,10 +428,11 @@ func TestLogMapping(t *testing.T) { log.SetSeverityNumber(plog.SeverityNumberFatal) return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Timestamp: testObservedTime, - Severity: logging.Critical, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Severity: logtypepb.LogSeverity(logging.Critical), }, }, maxEntrySize: defaultMaxEntrySize, @@ -441,10 +460,11 @@ func TestLogMapping(t *testing.T) { log.SetSeverityText("fatal3") return log }, - expectedEntries: []logging.Entry{ + expectedEntries: []*logpb.LogEntry{ { - Timestamp: testObservedTime, - Severity: logging.Alert, + LogName: logName, + Timestamp: timestamppb.New(testObservedTime), + Severity: logtypepb.LogSeverity(logging.Alert), }, }, maxEntrySize: defaultMaxEntrySize, @@ -470,7 +490,12 @@ func TestLogMapping(t *testing.T) { assert.NotNil(t, err) } else { assert.Nil(t, err) - assert.Equal(t, testCase.expectedEntries, entries) + assert.Equal(t, len(testCase.expectedEntries), len(entries)) + for i := range testCase.expectedEntries { + if !proto.Equal(testCase.expectedEntries[i], entries[i]) { + assert.Equal(t, testCase.expectedEntries[i], entries[i]) + } + } } }) } @@ -516,10 +541,3 @@ func TestGetLogName(t *testing.T) { }) } } - -func makeExpectedHTTPReq(method, url, referer, userAgent string, body io.Reader) *http.Request { - req, _ := http.NewRequest(method, url, body) - req.Header.Set("Referer", referer) - req.Header.Set("User-Agent", userAgent) - return req -}