Skip to content

Commit

Permalink
feat(lokicompliance): add more fields to generated logs
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 26, 2024
1 parent 9397a8a commit d25e5d1
Showing 1 changed file with 107 additions and 38 deletions.
145 changes: 107 additions & 38 deletions internal/lokicompliance/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/golang/snappy"
"github.com/grafana/loki/pkg/push"
ht "github.com/ogen-go/ogen/http"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -57,6 +60,7 @@ func (opts *GenerateOptions) setDefaults() {
// GenerateLogs generates logs and sends them to targets
func GenerateLogs(ctx context.Context, targets []string, opts GenerateOptions) error {
opts.setDefaults()
r := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec: G404

streams := map[string]push.Stream{}
for ts := opts.Start; ts.Before(opts.End); ts = ts.Add(opts.Step) {
Expand All @@ -66,7 +70,7 @@ func GenerateLogs(ctx context.Context, targets []string, opts GenerateOptions) e
// we add a slight difference to each timestamp within step.
rt = rt.Add(100 * time.Microsecond)

stream := randomElement(opts.Streams)
stream := randomElement(r, opts.Streams)
s, ok := streams[stream]
if !ok {
s = push.Stream{
Expand All @@ -76,10 +80,10 @@ func GenerateLogs(ctx context.Context, targets []string, opts GenerateOptions) e
),
}
}
line := getLine(rt)
line := NewLogEntry(r, rt)
s.Entries = append(s.Entries, push.Entry{
Timestamp: rt,
Line: line,
Line: line.JSON(),
})
streams[stream] = s
}
Expand Down Expand Up @@ -161,27 +165,38 @@ func sendLogs(ctx context.Context, client ht.Client, target string, body push.Pu
return nil
}

func getLine(ts time.Time) string {
e := jx.GetEncoder()
defer jx.PutEncoder(e)
type LogEntry struct {
Timestamp time.Time
Level plog.SeverityNumber
Method string
Status int
Took time.Duration
Size uint64
SpanID pcommon.SpanID
TraceID pcommon.TraceID
}

e.ObjStart()
e.Field("ts", func(e *jx.Encoder) {
e.Str(ts.String())
})
e.Field("method", func(e *jx.Encoder) {
method := randomElement([]string{
// NewLogEntry generates new [LogEntry].
func NewLogEntry(r *rand.Rand, ts time.Time) LogEntry {
return LogEntry{
Timestamp: ts,
Level: randomElement(r, []plog.SeverityNumber{
plog.SeverityNumberTrace,
plog.SeverityNumberDebug,
plog.SeverityNumberInfo,
plog.SeverityNumberWarn,
plog.SeverityNumberError,
plog.SeverityNumberFatal,
}),
Method: randomElement(r, []string{
http.MethodGet,
http.MethodHead,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
})
e.Str(method)
})
e.Field("status", func(e *jx.Encoder) {
status := randomElement([]int{
}),
Status: randomElement(r, []int{
http.StatusOK,
http.StatusCreated,
http.StatusNoContent,
Expand All @@ -196,31 +211,85 @@ func getLine(ts time.Time) string {

http.StatusInternalServerError,
http.StatusNotImplemented,
})
e.Int(status)
})
e.Field("took", func(e *jx.Encoder) {
took := rand.Int63n(int64(30*time.Minute)) + int64(time.Millisecond) // #nosec: G404
e.Str(time.Duration(took).String())
})
e.Field("size", func(e *jx.Encoder) {
size := uint64(rand.Intn(1024*1024)) + 1025 // #nosec: G404
e.Str(humanize.Bytes(size))
})
e.ObjEnd()

return e.String()
}),
Took: time.Duration(r.Int63n(int64(30*time.Minute)) + int64(time.Millisecond)),
Size: uint64(r.Intn(1024*1024)) + 1025,
SpanID: fillRandom[pcommon.SpanID](r),
TraceID: fillRandom[pcommon.TraceID](r),
}
}

// GenerateResult is log generation result.
type GenerateResult struct {
StartMarker string
EndMarker string
func fillRandom[S ~[8]byte | ~[16]byte](r *rand.Rand) (s S) {
buf := make([]byte, len(s))
r.Read(buf)
return (S)(buf)
}

func randomElement[S ~[]T, T any](s S) (r T) {
func randomElement[S ~[]T, T any](r *rand.Rand, s S) (zero T) {
if len(s) == 0 {
return r
return zero
}
return s[r.Intn(len(s))]
}

// JSON returns JSON-encoded line.
func (e LogEntry) JSON() string {
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)

e.EncodeJSON(enc)
return enc.String()
}

// EncodeJSON encodes entry to given encoder.
func (e LogEntry) EncodeJSON(enc *jx.Encoder) {
enc.ObjStart()
enc.Field("ts", func(enc *jx.Encoder) {
enc.Str(e.Timestamp.String())
})
enc.Field("level", func(enc *jx.Encoder) {
enc.Str(e.Level.String())
})
enc.Field("method", func(enc *jx.Encoder) {
enc.Str(e.Method)
})
enc.Field("status", func(enc *jx.Encoder) {
enc.Int(e.Status)
})
enc.Field("took", func(enc *jx.Encoder) {
enc.Str(e.Took.String())
})
enc.Field("size", func(enc *jx.Encoder) {
enc.Str(humanize.Bytes(e.Size))
})
if id := e.SpanID; !id.IsEmpty() {
enc.Field("span_id", func(enc *jx.Encoder) {
enc.Str(id.String())
})
}
if id := e.TraceID; !id.IsEmpty() {
enc.Field("trace_id", func(enc *jx.Encoder) {
enc.Str(id.String())
})
}
enc.ObjEnd()
}

// OTEL returns OpenTelemetry log record.
func (e LogEntry) OTEL(r plog.LogRecord) {
ts := pcommon.NewTimestampFromTime(e.Timestamp)
r.SetObservedTimestamp(ts)
r.SetTimestamp(ts)
r.SetTraceID(e.TraceID)
r.SetSpanID(e.SpanID)
r.SetFlags(plog.DefaultLogRecordFlags)
r.SetSeverityText(e.Level.String())
r.SetSeverityNumber(e.Level)
r.Body().SetStr(e.JSON())
{
a := r.Attributes()
a.PutStr(string(semconv.HTTPMethodKey), e.Method)
a.PutInt(string(semconv.HTTPStatusCodeKey), int64(e.Status))
}
return s[rand.Intn(len(s))] // #nosec: G404
r.SetDroppedAttributesCount(0)
}

0 comments on commit d25e5d1

Please sign in to comment.