Skip to content

Commit

Permalink
feat(pubsub): support message filtering in pstest (#9015)
Browse files Browse the repository at this point in the history
* feat(pstest): support message filtering

* fix(pstest): add license headers

* fix(pstest): update copyright year

* fix(pstest): filter in subscription.deliver

* fix(pstest): ini filter in newSubscription

* feat(pstest): public func to validate filter

* fix: resolve conflict

* feat(pstest): filter with escapes characters

* fix: update copy right header

* fix: format code

* feat(pstest): add comment to ValidateFilter

* fix(pstest): use valid filter string in test

* fix(pstest): use valid filter string in test

---------

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
zezhehh and hongalex committed Jan 25, 2024
1 parent cc98509 commit 49231bf
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 6 deletions.
1 change: 1 addition & 0 deletions pubsub/go.mod
Expand Up @@ -9,6 +9,7 @@ require (
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.6.0
github.com/googleapis/gax-go/v2 v2.12.0
go.einride.tech/aip v0.65.0
go.opencensus.io v0.24.0
golang.org/x/oauth2 v0.16.0
golang.org/x/sync v0.6.0
Expand Down
3 changes: 3 additions & 0 deletions pubsub/go.sum
Expand Up @@ -75,6 +75,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.einride.tech/aip v0.65.0 h1:aqKEV1g9diXcR6DAxBVZoJn6ho8SuC+TOZFXzuu7kLU=
go.einride.tech/aip v0.65.0/go.mod h1:wcRZ57XFEvERWLPy9VqDBtXc/ZFj7ugsd32F5o8Th+s=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
Expand Down Expand Up @@ -187,5 +189,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
35 changes: 31 additions & 4 deletions pubsub/pstest/fake.go
Expand Up @@ -36,6 +36,7 @@ import (

"cloud.google.com/go/internal/testutil"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.einride.tech/aip/filtering"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
durpb "google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -684,6 +685,11 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
sub.proto.RetryPolicy = req.Subscription.RetryPolicy

case "filter":
filter, err := parseFilter(req.Subscription.Filter)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "bad filter: %v", err)
}
sub.filter = &filter
sub.proto.Filter = req.Subscription.Filter

case "enable_exactly_once_delivery":
Expand Down Expand Up @@ -853,6 +859,7 @@ type subscription struct {
streams []*stream
done chan struct{}
timeNowFunc func() time.Time
filter *filtering.Filter
}

func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, deadLetterTopic *topic, ps *pb.Subscription) *subscription {
Expand All @@ -861,7 +868,7 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea
at = 10 * time.Second
}
ps.State = pb.Subscription_ACTIVE
return &subscription{
sub := &subscription{
topic: t,
deadLetterTopic: deadLetterTopic,
mu: mu,
Expand All @@ -871,6 +878,14 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea
done: make(chan struct{}),
timeNowFunc: timeNowFunc,
}
if ps.Filter != "" {
filter, err := parseFilter(ps.Filter)
if err != nil {
panic(fmt.Sprintf("pstest: bad filter: %v", err))
}
sub.filter = &filter
}
return sub
}

func (s *subscription) start(wg *sync.WaitGroup) {
Expand Down Expand Up @@ -1068,7 +1083,8 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
now := s.timeNowFunc()
s.maintainMessages(now)
var msgs []*pb.ReceivedMessage
for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) {
filterMsgs(s.msgs, s.filter)
for id, m := range orderMsgs(s.msgs, s.proto.EnableMessageOrdering) {
if m.outstanding() {
continue
}
Expand All @@ -1090,7 +1106,7 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
return msgs
}

func filterMsgs(msgs map[string]*message, enableMessageOrdering bool) map[string]*message {
func orderMsgs(msgs map[string]*message, enableMessageOrdering bool) map[string]*message {
if !enableMessageOrdering {
return msgs
}
Expand All @@ -1116,6 +1132,16 @@ func filterMsgs(msgs map[string]*message, enableMessageOrdering bool) map[string
return result
}

func filterMsgs(msgs map[string]*message, filter *filtering.Filter) {
if filter == nil {
return
}

filterByAttrs(msgs, filter, func(m *message) messageAttrs {
return m.proto.Message.Attributes
})
}

func (s *subscription) deliver() {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -1124,7 +1150,8 @@ func (s *subscription) deliver() {
s.maintainMessages(now)
// Try to deliver each remaining message.
curIndex := 0
for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) {
filterMsgs(s.msgs, s.filter)
for id, m := range orderMsgs(s.msgs, s.proto.EnableMessageOrdering) {
if m.outstanding() {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pubsub/pstest/fake_test.go
Expand Up @@ -1082,14 +1082,14 @@ func TestUpdateFilter(t *testing.T) {
AckDeadlineSeconds: minAckDeadlineSecs,
Name: "projects/P/subscriptions/S",
Topic: top.Name,
Filter: "some-filter",
Filter: "NOT attributes:foo",
})

update := &pb.Subscription{
AckDeadlineSeconds: sub.AckDeadlineSeconds,
Name: sub.Name,
Topic: top.Name,
Filter: "new-filter",
Filter: "NOT attributes:bar",
}

updated := mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Expand Down
210 changes: 210 additions & 0 deletions pubsub/pstest/filtering.go
@@ -0,0 +1,210 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pstest

import (
"go.einride.tech/aip/filtering"
"go.einride.tech/aip/filtering/exprs"
expr "google.golang.org/genproto/googleapis/api/expr/v1alpha1"
)

const (
attributesStr = "attributes"
hasPrefixStr = "hasPrefix"
)

// ValidateFilter validates if the filter string is parsable.
func ValidateFilter(filter string) error {
_, err := parseFilter(filter)
return err
}

// parseFilter validates a filter string and returns a Filter.
func parseFilter(filter string) (filtering.Filter, error) {
request := request{filter}

// Declare the functions and identifiers that are allowed in the filter.
declarations, err := filtering.NewDeclarations(
filtering.DeclareFunction(
hasPrefixStr,
filtering.NewFunctionOverload(hasPrefixStr,
filtering.TypeBool,
filtering.TypeString,
filtering.TypeString,
),
),
filtering.DeclareIdent(
attributesStr,
filtering.TypeMap(
filtering.TypeString,
filtering.TypeString,
),
),
filtering.DeclareStandardFunctions(),
)
if err != nil {
return filtering.Filter{}, err
}
return filtering.ParseFilter(request, declarations)
}

// request implements filtering.Request.
type request struct {
filter string
}

func (r request) GetFilter() string {
return r.filter
}

type messageAttrs map[string]string

// hasKey returns true if the attribute key exists.
func (a messageAttrs) hasKey(key string) bool {
_, ok := a[key]
return ok
}

// hasExpectedValue returns true if the attribute key exists and has the given value.
func (a messageAttrs) hasExpectedValue(key, value string) bool {
v, ok := a[key]
return ok && v == value
}

// hasPrefix returns true if the attribute key exists and has the given prefix.
func (a messageAttrs) hasPrefix(key, prefix string) bool {
v, ok := a[key]
if !ok {
return false
}
return len(v) >= len(prefix) && v[:len(prefix)] == prefix
}

// atomicMatch matches expressions that are not dependent on child expressions.
func atomicMatch(attributes messageAttrs, currExpr *expr.Expr) bool {
var key, value string
// Match "=" function.
// Example: `attributes.name = "com"`
matcher := exprs.MatchFunction(
filtering.FunctionEquals,
exprs.MatchAnyMember(exprs.MatchText(attributesStr), &key),
exprs.MatchAnyString(&value),
)
if matcher(currExpr) {
return attributes.hasExpectedValue(key, value)
}

// Match "!=" function.
// Example: `attributes.name != "com"`
matcher = exprs.MatchFunction(
filtering.FunctionNotEquals,
exprs.MatchAnyMember(exprs.MatchText(attributesStr), &key),
exprs.MatchAnyString(&value),
)
if matcher(currExpr) {
return !attributes.hasExpectedValue(key, value)
}

// Match ":" function.
// Example: `attributes:name`
matcher = exprs.MatchFunction(
filtering.FunctionHas,
exprs.MatchText(attributesStr),
exprs.MatchAnyString(&key),
)
if matcher(currExpr) {
return attributes.hasKey(key)
}

// Match "hasPrefix" function.
// Example: `hasPrefix(attributes.name, "co")`
matcher = exprs.MatchFunction(
"hasPrefix",
exprs.MatchAnyMember(exprs.MatchText(attributesStr), &key),
exprs.MatchAnyString(&value),
)
if matcher(currExpr) {
return attributes.hasPrefix(key, value)
}

return true
}

// compositeMatch matches expressions that are dependent on child expressions.
func compositeMatch(attributes messageAttrs, currExpr *expr.Expr) bool {
// Match "NOT" function.
// Example: `NOT attributes:name`
var e1, e2 *expr.Expr
matcher := exprs.MatchFunction(
filtering.FunctionNot,
exprs.MatchAny(&e1),
)
if matcher(currExpr) {
return !match(attributes, e1)
}

// Match "AND" function.
// Example: `attributes:lang = "en" AND attributes:name`
matcher = exprs.MatchFunction(
filtering.FunctionAnd,
exprs.MatchAny(&e1),
exprs.MatchAny(&e2),
)
if matcher(currExpr) {
return match(attributes, e1) && match(attributes, e2)
}

// Match "OR" function.
// Example: `attributes:lang = "en" OR attributes:name`
matcher = exprs.MatchFunction(
filtering.FunctionOr,
exprs.MatchAny(&e1),
exprs.MatchAny(&e2),
)
if matcher(currExpr) {
return match(attributes, e1) || match(attributes, e2)
}

return true
}

func match(attributes messageAttrs, currExpr *expr.Expr) bool {
// atomicMatch first to avoid deep recursion.
return atomicMatch(attributes, currExpr) && compositeMatch(attributes, currExpr)
}

// getAttrsFunc is a function that returns attributes from an item with any type.
type getAttrsFunc[T any] func(T) messageAttrs

// Make it generic so it's easy to be tested.
//
// Accept a map as input to efficiently delete unmatched items.
func filterByAttrs[T map[K]U, U any, K comparable](items T, filter *filtering.Filter, getAttrs getAttrsFunc[U]) {
for key, item := range items {
walkFn := func(currExpr, parentExpr *expr.Expr) bool {
_, ok := currExpr.ExprKind.(*expr.Expr_CallExpr) // only match call expressions
if !ok {
return true
}
attrs := getAttrs(item)
result := match(attrs, currExpr)
if !result && parentExpr == nil {
delete(items, key)
}
return result
}
filtering.Walk(walkFn, filter.CheckedExpr.Expr)
}
}

0 comments on commit 49231bf

Please sign in to comment.