Skip to content

Commit

Permalink
Merge branch 'master' into internal-actor-messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
dapr-bot committed Jan 15, 2024
2 parents 8b14af0 + 8d01d77 commit 853c705
Show file tree
Hide file tree
Showing 52 changed files with 538 additions and 343 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/dapr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ jobs:
key: ${{ matrix.target_os }}-${{ matrix.target_arch }}-go-${{ steps.setup-go.outputs.go-version }}-build-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ matrix.target_os }}-${{ matrix.target_arch }}-go-${{ steps.setup-go.outputs.go-version }}-build-
- name: Build binaries
run: make build
- name: Run make test-integration
run: make test-integration
build:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/dapr/components-contrib v1.12.1-0.20240111003925-7aa4013ca8b1
github.com/dapr/kit v0.12.2-0.20240111185916-c24d1d28cf35
github.com/dapr/kit v0.12.2-0.20240115170833-858719eb78ac
github.com/evanphx/json-patch/v5 v5.7.0
github.com/go-chi/chi/v5 v5.0.10
github.com/go-chi/cors v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuA
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/components-contrib v1.12.1-0.20240111003925-7aa4013ca8b1 h1:ux78axOxdvxKX5/5jQwLTBhZSWKpNX4ec/x/vH4O/8E=
github.com/dapr/components-contrib v1.12.1-0.20240111003925-7aa4013ca8b1/go.mod h1:8O0PgN/d8fEj/lNkkuxOX8AlGwdvJ8RHxCzyZ3skrSo=
github.com/dapr/kit v0.12.2-0.20240111185916-c24d1d28cf35 h1:38DASZIInG7I8btNZ2aAjJEl6n1Hgskw/VaQniLwLRk=
github.com/dapr/kit v0.12.2-0.20240111185916-c24d1d28cf35/go.mod h1:VyHrelNXPbtS/VcQX0Y/uzW0lfEVuveJ+1E5bDys8mo=
github.com/dapr/kit v0.12.2-0.20240115170833-858719eb78ac h1:hSroth18KVr6BNeADdQ3vQukLuI0F5KsmMbAhEZS5AA=
github.com/dapr/kit v0.12.2-0.20240115170833-858719eb78ac/go.mod h1:VyHrelNXPbtS/VcQX0Y/uzW0lfEVuveJ+1E5bDys8mo=
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
4 changes: 2 additions & 2 deletions pkg/actors/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type timers struct {
activeTimersCountLock sync.RWMutex
metricsCollector timersMetricsCollector
runningCh chan struct{}
processor *queue.Processor[*internal.Reminder]
processor *queue.Processor[string, *internal.Reminder]
}

// NewTimersProvider returns a TimerProvider.
Expand All @@ -54,7 +54,7 @@ func NewTimersProvider(clock clock.WithTicker) internal.TimersProvider {
metricsCollector: diag.DefaultMonitoring.ActorTimers,
runningCh: make(chan struct{}),
}
t.processor = queue.NewProcessor[*internal.Reminder](t.processorExecuteFn)
t.processor = queue.NewProcessor[string, *internal.Reminder](t.processorExecuteFn)
return t
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package placement

import (
"context"
"errors"
"net"
"strconv"
"testing"
Expand Down Expand Up @@ -53,7 +54,10 @@ func newTestPlacementServer(t *testing.T, raftServer *raft.Server) (string, *Ser
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer close(serverStopped)
require.NoError(t, testServer.Run(ctx, strconv.Itoa(port)))
err := testServer.Run(ctx, strconv.Itoa(port))
if !errors.Is(err, grpc.ErrServerStopped) {
require.NoError(t, err)
}
}()

assert.Eventually(t, func() bool {
Expand Down
23 changes: 14 additions & 9 deletions pkg/runtime/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,22 @@ func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
metadata = b.Metadata
}

go func() {
_, err := handler(context.Background(), &bindings.ReadResponse{
Metadata: metadata,
Data: []byte(b.Data),
})
if b.ReadErrorCh != nil {
resp := &bindings.ReadResponse{
Metadata: metadata,
Data: []byte(b.Data),
}

if b.ReadErrorCh != nil {
go func() {
_, err := handler(ctx, resp)
b.ReadErrorCh <- (err != nil)
}
}()
}()

return nil
return nil
}

_, err := handler(ctx, resp)
return err
}

func (b *Binding) Operations() []bindings.OperationKind {
Expand Down
3 changes: 1 addition & 2 deletions pkg/runtime/processor/binding/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func TestStartReadingFromBindings(t *testing.T) {
}))
require.NoError(t, b.compStore.CommitPendingComponent())
err := b.StartReadingFromBindings(context.Background())

require.NoError(t, err)
assert.True(t, mockAppChannel.AssertNotCalled(t, "InvokeMethod", mock.Anything, mock.Anything))
assert.True(t, mockAppChannel.AssertCalled(t, "InvokeMethod", mock.Anything, mock.Anything))
})
}

Expand Down
34 changes: 18 additions & 16 deletions pkg/sentry/server/validator/jwks/jwks.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,30 @@ type Options struct {
// - sub: must include the SPIFFE ID of the requestor
type jwks struct {
sentryAudience string
opts Options
cache *jwkscache.JWKSCache
}

func New(ctx context.Context, opts Options) (validator.Validator, error) {
return &jwks{
sentryAudience: opts.SentryID.String(),
opts: opts,
}, nil
}

func (j *jwks) Start(ctx context.Context) error {
// Create a JWKS and start it
j.cache = jwkscache.NewJWKSCache(j.opts.Source, log)
cache := jwkscache.NewJWKSCache(opts.Source, log)

// Set options
if j.opts.MinRefreshInterval > time.Second {
j.cache.SetMinRefreshInterval(j.opts.MinRefreshInterval)
if opts.MinRefreshInterval > time.Second {
cache.SetMinRefreshInterval(opts.MinRefreshInterval)
}
if j.opts.RequestTimeout > time.Millisecond {
j.cache.SetRequestTimeout(j.opts.RequestTimeout)
if opts.RequestTimeout > time.Millisecond {
cache.SetRequestTimeout(opts.RequestTimeout)
}
if j.opts.CACertificate != "" {
j.cache.SetCACertificate(j.opts.CACertificate)
if opts.CACertificate != "" {
cache.SetCACertificate(opts.CACertificate)
}

return &jwks{
sentryAudience: opts.SentryID.String(),
cache: cache,
}, nil
}

func (j *jwks) Start(ctx context.Context) error {
// Start the cache. Note this is a blocking call
err := j.cache.Start(ctx)
if err != nil {
Expand All @@ -92,6 +90,10 @@ func (j *jwks) Validate(ctx context.Context, req *sentryv1pb.SignCertificateRequ
return td, false, errors.New("the request does not contain a token")
}

if err = j.cache.WaitForCacheReady(ctx); err != nil {
return td, false, errors.New("jwks validator not ready")
}

// Validate the internal request
// This also returns the trust domain.
td, _, err = internal.Validate(ctx, req)
Expand Down
19 changes: 9 additions & 10 deletions tests/integration/framework/process/sentry/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,17 @@ func (s *Sentry) Cleanup(t *testing.T) {

func (s *Sentry) WaitUntilRunning(t *testing.T, ctx context.Context) {
client := util.HTTPClient(t)
assert.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://localhost:%d/healthz", s.healthzPort), nil)
if err != nil {
return false
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://localhost:%d/healthz", s.healthzPort), nil)
require.NoError(t, err)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
resp, err := client.Do(req)
if err != nil {
return false
//nolint:testifylint
if assert.NoError(c, err) {
defer resp.Body.Close()
assert.Equal(c, http.StatusOK, resp.StatusCode)
}
defer resp.Body.Close()
return http.StatusOK == resp.StatusCode
}, time.Second*5, 100*time.Millisecond)
}, time.Second*20, 100*time.Millisecond)
}

func (s *Sentry) TrustAnchorsFile(t *testing.T) string {
Expand Down
47 changes: 30 additions & 17 deletions tests/integration/framework/process/statestore/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"

"github.com/dapr/components-contrib/state"
Expand All @@ -37,8 +40,8 @@ type StateStore struct {
listener net.Listener
socketName string
component *component
server *grpc.Server
srvErrCh chan error
stopCh chan struct{}
}

func New(t *testing.T, fopts ...Option) *StateStore {
Expand All @@ -61,12 +64,21 @@ func New(t *testing.T, fopts ...Option) *StateStore {
listener, err := net.Listen("unix", path)
require.NoError(t, err)

component := newComponent(t, opts)

server := grpc.NewServer()
compv1pb.RegisterStateStoreServer(server, component)
compv1pb.RegisterTransactionalStateStoreServer(server, component)
compv1pb.RegisterQueriableStateStoreServer(server, component)
compv1pb.RegisterTransactionalStoreMultiMaxSizeServer(server, component)
reflection.Register(server)

return &StateStore{
listener: listener,
component: newComponent(t, opts),
component: component,
socketName: socketFile,
server: server,
srvErrCh: make(chan error),
stopCh: make(chan struct{}),
}
}

Expand All @@ -76,26 +88,27 @@ func (s *StateStore) SocketName() string {

func (s *StateStore) Run(t *testing.T, ctx context.Context) {
s.component.impl.Init(ctx, state.Metadata{})

server := grpc.NewServer()
compv1pb.RegisterStateStoreServer(server, s.component)
compv1pb.RegisterTransactionalStateStoreServer(server, s.component)
compv1pb.RegisterQueriableStateStoreServer(server, s.component)
compv1pb.RegisterTransactionalStoreMultiMaxSizeServer(server, s.component)
reflection.Register(server)

go func() {
s.srvErrCh <- server.Serve(s.listener)
s.srvErrCh <- s.server.Serve(s.listener)
}()

go func() {
<-s.stopCh
server.GracefulStop()
}()
conn, err := grpc.DialContext(ctx, "unix://"+s.listener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.NoError(t, err)

client := compv1pb.NewStateStoreClient(conn)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
_, err = client.Ping(ctx, new(compv1pb.PingRequest))
//nolint:testifylint
assert.NoError(c, err)
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, conn.Close())
}

func (s *StateStore) Cleanup(t *testing.T) {
close(s.stopCh)
s.server.GracefulStop()
require.NoError(t, <-s.srvErrCh)
require.NoError(t, s.component.impl.(io.Closer).Close())
}
12 changes: 8 additions & 4 deletions tests/integration/framework/util/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/http"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

rtpbv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
Expand All @@ -38,11 +39,14 @@ func getMeta(t require.TestingT, ctx context.Context, client *http.Client, port
req, err := http.NewRequestWithContext(ctx, http.MethodGet, metaURL, nil)
require.NoError(t, err)

resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
var meta metaResponse
require.NoError(t, json.NewDecoder(resp.Body).Decode(&meta))
resp, err := client.Do(req)
//nolint:testifylint
if assert.NoError(t, err) {
defer resp.Body.Close()
//nolint:testifylint
assert.NoError(t, json.NewDecoder(resp.Body).Decode(&meta))
}

return meta
}
17 changes: 10 additions & 7 deletions tests/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@ func RunIntegrationTests(t *testing.T) {
})
require.False(t, binFailed, "building binaries must succeed")

focusedTests := make(map[string]suite.Case)
for name, tcase := range suite.All(t) {
focusedTests := make([]suite.NamedCase, 0)
for _, tcase := range suite.All(t) {
// Continue rather than using `t.Skip` to reduce the noise in the test
// output.
if !focus.MatchString(name) {
t.Logf("skipping test case due to focus %s", name)
if !focus.MatchString(tcase.Name()) {
t.Logf("skipping test case due to focus %s", tcase.Name())
continue
}
focusedTests[name] = tcase
focusedTests = append(focusedTests, tcase)
}

for name, tcase := range focusedTests {
t.Run(name, func(t *testing.T) {
startTime := time.Now()
for _, tcase := range focusedTests {
t.Run(tcase.Name(), func(t *testing.T) {
t.Logf("setting up test case")
options := tcase.Setup(t)

Expand All @@ -77,4 +78,6 @@ func RunIntegrationTests(t *testing.T) {
t.Log("done")
})
}

t.Logf("Total integration test execution time: %s", time.Since(startTime).Truncate(time.Millisecond*100))
}
6 changes: 3 additions & 3 deletions tests/integration/suite/actors/grpc/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (l *ttl) Run(t *testing.T, ctx context.Context) {
})
//nolint:testifylint
assert.NoError(c, err)
}, time.Second*10, time.Millisecond*100, "actor not ready")
}, time.Second*20, time.Millisecond*100, "actor not ready")

now := time.Now()

Expand Down Expand Up @@ -146,14 +146,14 @@ func (l *ttl) Run(t *testing.T, ctx context.Context) {
Key: "mykey",
Value: &anypb.Any{Value: []byte("myvalue")},
Metadata: map[string]string{
"ttlInSeconds": "4",
"ttlInSeconds": "3",
},
},
},
})
require.NoError(t, err)

time.Sleep(time.Second * 3)
time.Sleep(time.Second * 1)

var resp *rtv1.GetActorStateResponse
resp, err = client.GetActorState(ctx, &rtv1.GetActorStateRequest{
Expand Down
6 changes: 5 additions & 1 deletion tests/integration/suite/actors/healthz/initerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"net/http"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -42,6 +43,7 @@ type initerror struct {
configCalled chan struct{}
blockConfig chan struct{}
healthzCalled chan struct{}
once sync.Once
}

func (i *initerror) Setup(t *testing.T) []framework.Option {
Expand All @@ -57,7 +59,9 @@ func (i *initerror) Setup(t *testing.T) []framework.Option {
})
handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
close(i.healthzCalled)
i.once.Do(func() {
close(i.healthzCalled)
})
})
handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`OK`))
Expand Down

0 comments on commit 853c705

Please sign in to comment.