diff --git a/internal/flameql/error.go b/internal/flameql/error.go new file mode 100644 index 0000000..b910cc6 --- /dev/null +++ b/internal/flameql/error.go @@ -0,0 +1,45 @@ +package flameql + +import ( + "errors" + "fmt" +) + +var ( + ErrInvalidQuerySyntax = errors.New("invalid query syntax") + ErrInvalidAppName = errors.New("invalid application name") + ErrInvalidMatchersSyntax = errors.New("invalid tag matchers syntax") + ErrInvalidTagKey = errors.New("invalid tag key") + ErrInvalidTagValueSyntax = errors.New("invalid tag value syntax") + + ErrAppNameIsRequired = errors.New("application name is required") + ErrTagKeyIsRequired = errors.New("tag key is required") + ErrTagKeyReserved = errors.New("tag key is reserved") + + ErrMatchOperatorIsRequired = errors.New("match operator is required") + ErrUnknownOp = errors.New("unknown tag match operator") +) + +type Error struct { + Inner error + Expr string + // TODO: add offset? +} + +func newErr(err error, expr string) *Error { return &Error{Inner: err, Expr: expr} } + +func (e *Error) Error() string { return e.Inner.Error() + ": " + e.Expr } + +func (e *Error) Unwrap() error { return e.Inner } + +func newInvalidTagKeyRuneError(k string, r rune) *Error { + return newInvalidRuneError(ErrInvalidTagKey, k, r) +} + +func newInvalidAppNameRuneError(k string, r rune) *Error { + return newInvalidRuneError(ErrInvalidAppName, k, r) +} + +func newInvalidRuneError(err error, k string, r rune) *Error { + return newErr(err, fmt.Sprintf("%s: character is not allowed: %q", k, r)) +} diff --git a/internal/flameql/flameql.go b/internal/flameql/flameql.go new file mode 100644 index 0000000..94eb15c --- /dev/null +++ b/internal/flameql/flameql.go @@ -0,0 +1,114 @@ +package flameql + +import "regexp" + +type Query struct { + AppName string + Matchers []*TagMatcher + + q string // The original query string. +} + +func (q *Query) String() string { return q.q } + +type TagMatcher struct { + Key string + Value string + Op + + R *regexp.Regexp +} + +type Op int + +const ( + // The order should respect operator priority and cost. + // Negating operators go first. See IsNegation. + _ Op = iota + OpNotEqual // != + OpNotEqualRegex // !~ + OpEqual // = + OpEqualRegex // =~ +) + +const ( + ReservedTagKeyName = "__name__" +) + +var reservedTagKeys = []string{ + ReservedTagKeyName, +} + +// IsNegation reports whether the operator assumes negation. +func (o Op) IsNegation() bool { return o < OpEqual } + +// ByPriority is a supplemental type for sorting tag matchers. +type ByPriority []*TagMatcher + +func (p ByPriority) Len() int { return len(p) } +func (p ByPriority) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p ByPriority) Less(i, j int) bool { return p[i].Op < p[j].Op } + +func (m *TagMatcher) Match(v string) bool { + switch m.Op { + case OpEqual: + return m.Value == v + case OpNotEqual: + return m.Value != v + case OpEqualRegex: + return m.R.Match([]byte(v)) + case OpNotEqualRegex: + return !m.R.Match([]byte(v)) + default: + panic("invalid match operator") + } +} + +// ValidateTagKey report an error if the given key k violates constraints. +// +// The function should be used to validate user input. The function returns +// ErrTagKeyReserved if the key is valid but reserved for internal use. +func ValidateTagKey(k string) error { + if len(k) == 0 { + return ErrTagKeyIsRequired + } + for _, r := range k { + if !IsTagKeyRuneAllowed(r) { + return newInvalidTagKeyRuneError(k, r) + } + } + if IsTagKeyReserved(k) { + return newErr(ErrTagKeyReserved, k) + } + return nil +} + +// ValidateAppName report an error if the given app name n violates constraints. +func ValidateAppName(n string) error { + if len(n) == 0 { + return ErrAppNameIsRequired + } + for _, r := range n { + if !IsAppNameRuneAllowed(r) { + return newInvalidAppNameRuneError(n, r) + } + } + return nil +} + +func IsTagKeyRuneAllowed(r rune) bool { + return (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '_' +} + +func IsAppNameRuneAllowed(r rune) bool { + return r == '-' || r == '.' || IsTagKeyRuneAllowed(r) +} + +func IsTagKeyReserved(k string) bool { + for _, s := range reservedTagKeys { + if s == k { + return true + } + } + return false +} diff --git a/internal/flameql/key.go b/internal/flameql/key.go new file mode 100644 index 0000000..236e212 --- /dev/null +++ b/internal/flameql/key.go @@ -0,0 +1,221 @@ +package flameql + +import ( + "errors" + "strconv" + "strings" + "time" + + "github.com/pyroscope-io/pyroscope-lambda-extension/internal/sortedmap" +) + +type Key struct { + labels map[string]string +} + +type ParserState int + +const ( + nameParserState ParserState = iota + tagKeyParserState + tagValueParserState + doneParserState +) + +func NewKey(labels map[string]string) *Key { return &Key{labels: labels} } + +func ParseKey(name string) (*Key, error) { + k := &Key{labels: make(map[string]string)} + p := parser{parserState: nameParserState} + var err error + for _, r := range name + "{" { + switch p.parserState { + case nameParserState: + err = p.nameParserCase(r, k) + case tagKeyParserState: + p.tagKeyParserCase(r) + case tagValueParserState: + err = p.tagValueParserCase(r, k) + } + if err != nil { + return nil, err + } + } + return k, nil +} + +type parser struct { + parserState ParserState + key string + value string +} + +// ParseKey's nameParserState switch case +func (p *parser) nameParserCase(r int32, k *Key) error { + switch r { + case '{': + p.parserState = tagKeyParserState + appName := strings.TrimSpace(p.value) + if err := ValidateAppName(appName); err != nil { + return err + } + k.labels["__name__"] = appName + default: + p.value += string(r) + } + return nil +} + +// ParseKey's tagKeyParserState switch case +func (p *parser) tagKeyParserCase(r int32) { + switch r { + case '}': + p.parserState = doneParserState + case '=': + p.parserState = tagValueParserState + p.value = "" + default: + p.key += string(r) + } +} + +// ParseKey's tagValueParserState switch case +func (p *parser) tagValueParserCase(r int32, k *Key) error { + switch r { + case ',', '}': + p.parserState = tagKeyParserState + key := strings.TrimSpace(p.key) + if !IsTagKeyReserved(key) { + if err := ValidateTagKey(key); err != nil { + return err + } + } + k.labels[key] = strings.TrimSpace(p.value) + p.key = "" + default: + p.value += string(r) + } + return nil +} + +func (k *Key) SegmentKey() string { + return k.Normalized() +} + +func TreeKey(k string, depth int, unixTime int64) string { + return k + ":" + strconv.Itoa(depth) + ":" + strconv.FormatInt(unixTime, 10) +} + +func (k *Key) TreeKey(depth int, t time.Time) string { + return TreeKey(k.Normalized(), depth, t.Unix()) +} + +var errKeyInvalid = errors.New("invalid key") + +// ParseTreeKey retrieves tree time and depth level from the given key. +func ParseTreeKey(k string) (time.Time, int, error) { + a := strings.Split(k, ":") + if len(a) < 3 { + return time.Time{}, 0, errKeyInvalid + } + level, err := strconv.Atoi(a[1]) + if err != nil { + return time.Time{}, 0, err + } + v, err := strconv.Atoi(a[2]) + if err != nil { + return time.Time{}, 0, err + } + return time.Unix(int64(v), 0), level, err +} + +func (k *Key) DictKey() string { + return k.labels["__name__"] +} + +// FromTreeToDictKey returns app name from tree key k: given tree key +// "foo{}:0:1234567890", the call returns "foo". +// +// Before tags support, segment key form (i.e. app name + tags: foo{key=value}) +// has been used to reference a dictionary (trie). +func FromTreeToDictKey(k string) string { + return k[0:strings.IndexAny(k, "{")] +} + +func (k *Key) Normalized() string { + var sb strings.Builder + + sortedMap := sortedmap.New() + for k, v := range k.labels { + if k == "__name__" { + sb.WriteString(v) + } else { + sortedMap.Put(k, v) + } + } + + sb.WriteString("{") + for i, k := range sortedMap.Keys() { + v := sortedMap.Get(k).(string) + if i != 0 { + sb.WriteString(",") + } + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(v) + } + sb.WriteString("}") + + return sb.String() +} + +func (k *Key) AppName() string { + return k.labels["__name__"] +} + +func (k *Key) Labels() map[string]string { + return k.labels +} + +func (k *Key) Add(key, value string) { + if value == "" { + delete(k.labels, key) + } else { + k.labels[key] = value + } +} + +// Match reports whether the key matches the query. +func (k *Key) Clone() *Key { + newMap := make(map[string]string) + for k, v := range k.labels { + newMap[k] = v + } + return &Key{labels: newMap} +} + +func (k *Key) Match(q *Query) bool { + if k.AppName() != q.AppName { + return false + } + for _, m := range q.Matchers { + var ok bool + for labelKey, labelValue := range k.labels { + if m.Key != labelKey { + continue + } + if m.Match(labelValue) { + if !m.IsNegation() { + ok = true + break + } + } else if m.IsNegation() { + return false + } + } + if !ok && !m.IsNegation() { + return false + } + } + return true +} diff --git a/internal/flameql/parse.go b/internal/flameql/parse.go new file mode 100644 index 0000000..282091b --- /dev/null +++ b/internal/flameql/parse.go @@ -0,0 +1,174 @@ +package flameql + +import ( + "regexp" + "sort" + "strings" +) + +// ParseQuery parses a string of $app_name<{<$tag_matchers>}> form. +func ParseQuery(s string) (*Query, error) { + s = strings.TrimSpace(s) + q := Query{q: s} + + for offset, c := range s { + switch c { + case '{': + if offset == 0 { + return nil, ErrAppNameIsRequired + } + if s[len(s)-1] != '}' { + return nil, newErr(ErrInvalidQuerySyntax, "expected } at the end") + } + m, err := ParseMatchers(s[offset+1 : len(s)-1]) + if err != nil { + return nil, err + } + q.AppName = s[:offset] + q.Matchers = m + return &q, nil + default: + if !IsAppNameRuneAllowed(c) { + return nil, newErr(ErrInvalidAppName, s[:offset+1]) + } + } + } + + if len(s) == 0 { + return nil, ErrAppNameIsRequired + } + + q.AppName = s + return &q, nil +} + +// ParseMatchers parses a string of $tag_matcher<,$tag_matchers> form. +func ParseMatchers(s string) ([]*TagMatcher, error) { + var matchers []*TagMatcher + for _, t := range split(s) { + if t == "" { + continue + } + m, err := ParseMatcher(strings.TrimSpace(t)) + if err != nil { + return nil, err + } + matchers = append(matchers, m) + } + if len(matchers) == 0 && len(s) != 0 { + return nil, newErr(ErrInvalidMatchersSyntax, s) + } + sort.Sort(ByPriority(matchers)) + return matchers, nil +} + +// ParseMatcher parses a string of $tag_key$op"$tag_value" form, +// where $op is one of the supported match operators. +func ParseMatcher(s string) (*TagMatcher, error) { + var tm TagMatcher + var offset int + var c rune + +loop: + for offset, c = range s { + r := len(s) - (offset + 1) + switch c { + case '=': + switch { + case r <= 2: + return nil, newErr(ErrInvalidTagValueSyntax, s) + case s[offset+1] == '"': + tm.Op = OpEqual + case s[offset+1] == '~': + if r <= 3 { + return nil, newErr(ErrInvalidTagValueSyntax, s) + } + tm.Op = OpEqualRegex + default: + // Just for more meaningful error message. + if s[offset+2] != '"' { + return nil, newErr(ErrInvalidTagValueSyntax, s) + } + return nil, newErr(ErrUnknownOp, s) + } + break loop + case '!': + if r <= 3 { + return nil, newErr(ErrInvalidTagValueSyntax, s) + } + switch s[offset+1] { + case '=': + tm.Op = OpNotEqual + case '~': + tm.Op = OpNotEqualRegex + default: + return nil, newErr(ErrUnknownOp, s) + } + break loop + default: + if !IsTagKeyRuneAllowed(c) { + return nil, newInvalidTagKeyRuneError(s, c) + } + } + } + + k := s[:offset] + if IsTagKeyReserved(k) { + return nil, newErr(ErrTagKeyReserved, k) + } + + var v string + var ok bool + switch tm.Op { + default: + return nil, newErr(ErrMatchOperatorIsRequired, s) + case OpEqual: + v, ok = unquote(s[offset+1:]) + case OpNotEqual, OpEqualRegex, OpNotEqualRegex: + v, ok = unquote(s[offset+2:]) + } + if !ok { + return nil, newErr(ErrInvalidTagValueSyntax, v) + } + + // Compile regex, if applicable. + switch tm.Op { + case OpEqualRegex, OpNotEqualRegex: + r, err := regexp.Compile(v) + if err != nil { + return nil, newErr(err, v) + } + tm.R = r + } + + tm.Key = k + tm.Value = v + return &tm, nil +} + +func unquote(s string) (string, bool) { + if s[0] != '"' || s[len(s)-1] != '"' { + return s, false + } + return s[1 : len(s)-1], true +} + +func split(s string) []string { + var r []string + var x int + var y bool + for i := 0; i < len(s); i++ { + switch { + case s[i] == ',' && !y: + r = append(r, s[x:i]) + x = i + 1 + case s[i] == '"': + if y && i > 0 && s[i-1] != '\\' { + y = false + continue + } + y = true + } + } + return append(r, s[x:]) +} diff --git a/internal/sessionid/sessionid.go b/internal/sessionid/sessionid.go new file mode 100644 index 0000000..88b7a60 --- /dev/null +++ b/internal/sessionid/sessionid.go @@ -0,0 +1,81 @@ +package sessionid + +import ( + crand "crypto/rand" + "encoding/binary" + "encoding/hex" + "hash/fnv" + "math/rand" + "net/http" + "os" + "sync" + + "github.com/pyroscope-io/pyroscope-lambda-extension/internal/flameql" +) + +const LabelName = "__session_id__" + +func InjectToRequest(sessionID string, r *http.Request) { + parsed, err := flameql.ParseKey(r.URL.Query().Get("name")) + if err != nil { + // This is an invalid request, but we defer to the backend. + return + } + if _, ok := parsed.Labels()[LabelName]; !ok { + parsed.Add(LabelName, sessionID) + q := r.URL.Query() + q.Set("name", parsed.Normalized()) + r.URL.RawQuery = q.Encode() + } +} + +type ID uint64 + +func (s ID) String() string { + var b [8]byte + binary.LittleEndian.PutUint64(b[:], uint64(s)) + return hex.EncodeToString(b[:]) +} + +func New() ID { return globalSessionIDGenerator.newSessionID() } + +var globalSessionIDGenerator = newSessionIDGenerator() + +type sessionIDGenerator struct { + sync.Mutex + src *rand.Rand +} + +func (gen *sessionIDGenerator) newSessionID() ID { + var b [8]byte + gen.Lock() + _, _ = gen.src.Read(b[:]) + gen.Unlock() + return ID(binary.LittleEndian.Uint64(b[:])) +} + +func newSessionIDGenerator() *sessionIDGenerator { + s, ok := sessionIDHostSeed() + if !ok { + s = sessionIDRandSeed() + } + return &sessionIDGenerator{src: rand.New(rand.NewSource(s))} +} + +func sessionIDRandSeed() int64 { + var rndSeed int64 + _ = binary.Read(crand.Reader, binary.LittleEndian, &rndSeed) + return rndSeed +} + +var hostname = os.Hostname + +func sessionIDHostSeed() (int64, bool) { + v, err := hostname() + if err != nil { + return 0, false + } + h := fnv.New64a() + _, _ = h.Write([]byte(v)) + return int64(h.Sum64()), true +} diff --git a/internal/sortedmap/sortedmap.go b/internal/sortedmap/sortedmap.go new file mode 100644 index 0000000..821d864 --- /dev/null +++ b/internal/sortedmap/sortedmap.go @@ -0,0 +1,33 @@ +package sortedmap + +import ( + "sort" +) + +type SortedMap struct { + data map[string]interface{} + keys []string +} + +func (s *SortedMap) Put(k string, v interface{}) { + s.data[k] = v + i := sort.Search(len(s.keys), func(i int) bool { return s.keys[i] >= k }) + s.keys = append(s.keys, "") + copy(s.keys[i+1:], s.keys[i:]) + s.keys[i] = k +} + +func (s *SortedMap) Get(k string) (v interface{}) { + return s.data[k] +} + +func (s *SortedMap) Keys() []string { + return s.keys +} + +func New() *SortedMap { + return &SortedMap{ + data: make(map[string]interface{}), + keys: make([]string, 0), + } +} diff --git a/main.go b/main.go index 181d18c..80a38df 100644 --- a/main.go +++ b/main.go @@ -12,10 +12,12 @@ import ( "syscall" "time" + "github.com/sirupsen/logrus" + "github.com/pyroscope-io/pyroscope-lambda-extension/extension" + "github.com/pyroscope-io/pyroscope-lambda-extension/internal/sessionid" "github.com/pyroscope-io/pyroscope-lambda-extension/relay" "github.com/pyroscope-io/pyroscope-lambda-extension/selfprofiler" - "github.com/sirupsen/logrus" ) var ( @@ -45,7 +47,7 @@ var ( logFileFieldName = getEnvStrOr("PYROSCOPE_LOG_FILE_FIELD_NAME", logrus.FieldKeyFile) // to where relay data to - remoteAddress = getEnvStrOr("PYROSCOPE_REMOTE_ADDRESS", "https://ingest.pyroscope.cloud") + remoteAddress = getEnvStrOr("PYROSCOPE_REMOTE_ADDRESS", "https://profiles-prod-001.grafana.net") authToken = getEnvStrOr("PYROSCOPE_AUTH_TOKEN", "") basicAuthUser = getEnvStrOr("PYROSCOPE_BASIC_AUTH_USER", "") @@ -76,6 +78,7 @@ func main() { HTTPHeadersJSON: httpHeaders, Timeout: timeout, MaxIdleConnsPerHost: numWorkers, + SessionID: sessionid.New().String(), }) // TODO(eh-am): a find a better default for num of workers queue := relay.NewRemoteQueue(logger, &relay.RemoteQueueCfg{NumWorkers: numWorkers}, remoteClient) @@ -272,4 +275,4 @@ func getEnvIntOr(key string, fallback int) int { } return fallback -} \ No newline at end of file +} diff --git a/relay/client.go b/relay/client.go index 89adad8..254eb5e 100644 --- a/relay/client.go +++ b/relay/client.go @@ -11,6 +11,8 @@ import ( "time" "github.com/sirupsen/logrus" + + "github.com/pyroscope-io/pyroscope-lambda-extension/internal/sessionid" ) var ( @@ -28,13 +30,15 @@ type RemoteClientCfg struct { HTTPHeadersJSON string Timeout time.Duration MaxIdleConnsPerHost int + SessionID string } type RemoteClient struct { - config *RemoteClientCfg - client *http.Client - headers map[string]string - log *logrus.Entry + config *RemoteClientCfg + client *http.Client + headers map[string]string + log *logrus.Entry + sessionID string } func NewRemoteClient(log *logrus.Entry, config *RemoteClientCfg) *RemoteClient { @@ -53,8 +57,9 @@ func NewRemoteClient(log *logrus.Entry, config *RemoteClientCfg) *RemoteClient { } } return &RemoteClient{ - log: log, - config: config, + log: log, + config: config, + sessionID: config.SessionID, client: &http.Client{ Timeout: timeout, Transport: &http.Transport{ @@ -88,7 +93,7 @@ func (r *RemoteClient) Send(req *http.Request) error { req.URL.Path = path.Join(u.Path, req.URL.Path) req.Header.Set("X-Forwarded-Host", req.Header.Get("Host")) req.Host = u.Host - + sessionid.InjectToRequest(r.sessionID, req) // TODO(eh-am): check it's a request to /ingest? r.log.Debugf("Making request to %s", req.URL.String()) res, err := r.client.Do(req) diff --git a/relay/client_test.go b/relay/client_test.go index 91d2615..09b8038 100644 --- a/relay/client_test.go +++ b/relay/client_test.go @@ -8,8 +8,12 @@ import ( "testing" "time" - "github.com/pyroscope-io/pyroscope-lambda-extension/relay" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pyroscope-io/pyroscope-lambda-extension/internal/flameql" + "github.com/pyroscope-io/pyroscope-lambda-extension/internal/sessionid" + "github.com/pyroscope-io/pyroscope-lambda-extension/relay" ) func TestRemoteClient(t *testing.T) { @@ -21,10 +25,26 @@ func TestRemoteClient(t *testing.T) { profile := readTestdataFile(t, "testdata/profile.pprof") authToken := "123" + rcc := &relay.RemoteClientCfg{ + AuthToken: "123", + SessionID: sessionid.New().String(), + } + remoteServer := httptest.NewServer(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, u.Path, r.URL.Path, "path is mirrored") - assert.Equal(t, u.RawQuery, r.URL.RawQuery, "query params are mirrored") + + q := r.URL.Query() + parsed, err := flameql.ParseKey(q.Get("name")) + require.NoError(t, err) + require.Equal(t, + rcc.SessionID, + parsed.Labels()[sessionid.LabelName], + "requests has __session_id__ label") + + delete(parsed.Labels(), sessionid.LabelName) + q.Set("name", parsed.Normalized()) + assert.Equal(t, u.RawQuery, q.Encode(), "query params are mirrored") body := &bytes.Buffer{} body.ReadFrom(r.Body) @@ -34,7 +54,8 @@ func TestRemoteClient(t *testing.T) { }), ) - remoteClient := relay.NewRemoteClient(logger, &relay.RemoteClientCfg{Address: remoteServer.URL, AuthToken: "123"}) + rcc.Address = remoteServer.URL + remoteClient := relay.NewRemoteClient(logger, rcc) req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(profile)) assert.NoError(t, err) diff --git a/relay/server_test.go b/relay/server_test.go deleted file mode 100644 index e7b6996..0000000 --- a/relay/server_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package relay_test - -//func TestServer(t *testing.T) { -// svc := relay.NewServer(noopLogger(), &relay.ServerCfg{ -// ServerAddress: ":0", -// }, func(w http.ResponseWriter, r *http.Request) { -// -// }) -// -// go func() { -// err := svc.Start() -// }() -// -// -// httptest.NewServer -// assert.NoError(t, err) -// assert.Equal(t, "true", "false") -//}