Skip to content

Commit

Permalink
Merge branch 'master' into metadata-actor-status
Browse files Browse the repository at this point in the history
  • Loading branch information
dapr-bot committed Nov 20, 2023
2 parents d744520 + 7e14aac commit 3e49978
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 19 deletions.
4 changes: 3 additions & 1 deletion pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,9 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(in.StoreName)
if outboxEnabled {
trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.StoreName, operations, a.UniversalAPI.AppID)
span := diagUtils.SpanFromContext(ctx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.StoreName, operations, a.UniversalAPI.AppID, corID, traceState)
if err != nil {
err = status.Errorf(codes.Internal, messages.ErrPublishOutbox, err.Error())
apiServerLogger.Debug(err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,9 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {

outboxEnabled := a.pubsubAdapter.Outbox().Enabled(storeName)
if outboxEnabled {
trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID)
span := diagUtils.SpanFromContext(reqCtx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID, corID, traceState)
if err != nil {
msg := NewErrorResponse(
"ERR_PUBLISH_OUTBOX",
Expand Down
2 changes: 1 addition & 1 deletion pkg/outbox/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import (
type Outbox interface {
AddOrUpdateOutbox(stateStore v1alpha1.Component)
Enabled(stateStore string) bool
PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source string) ([]state.TransactionalStateOperation, error)
PublishInternal(ctx context.Context, stateStore string, states []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error)
SubscribeToInternalTopics(ctx context.Context, appID string) error
}
16 changes: 11 additions & 5 deletions pkg/runtime/pubsub/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func transaction() (state.TransactionalStateOperation, error) {
}

// PublishInternal publishes the state to an internal topic for outbox processing
func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, operations []state.TransactionalStateOperation, source string) ([]state.TransactionalStateOperation, error) {
func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, operations []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error) {
o.lock.RLock()
c, ok := o.outboxStores[stateStore]
o.lock.RUnlock()
Expand All @@ -156,10 +156,12 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
}

ce := &CloudEvent{
ID: tr.GetKey(),
Source: source,
Pubsub: c.outboxPubsub,
Data: ceData,
ID: tr.GetKey(),
Source: source,
Pubsub: c.outboxPubsub,
Data: ceData,
TraceID: traceID,
TraceState: traceState,
}

if sr.ContentType != nil {
Expand Down Expand Up @@ -220,6 +222,8 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string
stateKey := o.cloudEventExtractorFn(cloudEvent, contribPubsub.IDField)
data := []byte(o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataField))
contentType := o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataContentTypeField)
traceID := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceIDField)
traceState := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceStateField)

store, ok := o.getStateFn(stateStore)
if !ok {
Expand Down Expand Up @@ -268,6 +272,8 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string
Pubsub: c.publishPubSub,
Source: appID,
Topic: c.publishTopic,
TraceID: traceID,
TraceState: traceState,
}, nil)
if err != nil {
return err
Expand Down
28 changes: 18 additions & 10 deletions pkg/runtime/pubsub/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestPublishInternal(t *testing.T) {
Key: "key",
Value: "test",
},
}, "testapp")
}, "testapp", "", "")

assert.NoError(t, err)
})
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestPublishInternal(t *testing.T) {
Value: "test",
ContentType: &contentType,
},
}, "testapp")
}, "testapp", "", "")

assert.NoError(t, err)
})
Expand All @@ -322,7 +322,7 @@ func TestPublishInternal(t *testing.T) {
Key: "key",
Value: "test",
},
}, "testapp")
}, "testapp", "", "")
assert.Error(t, err)
})

Expand Down Expand Up @@ -359,7 +359,7 @@ func TestPublishInternal(t *testing.T) {
},
})

_, err := o.PublishInternal(context.TODO(), "test", []state.TransactionalStateOperation{}, "testapp")
_, err := o.PublishInternal(context.TODO(), "test", []state.TransactionalStateOperation{}, "testapp", "", "")

assert.NoError(t, err)
})
Expand Down Expand Up @@ -401,14 +401,14 @@ func TestPublishInternal(t *testing.T) {
Key: "1",
Value: "hello",
},
}, "testapp")
}, "testapp", "", "")

assert.Error(t, err)
})
}

func TestSubscribeToInternalTopics(t *testing.T) {
t.Run("correct configuration", func(t *testing.T) {
t.Run("correct configuration with trace", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)
o.cloudEventExtractorFn = extractCloudEventProperty

Expand All @@ -432,6 +432,14 @@ func TestSubscribeToInternalTopics(t *testing.T) {
close(externalCalledCh)
}

ce := map[string]string{}
json.Unmarshal(pr.Data, &ce)

traceID := ce[contribPubsub.TraceIDField]
traceState := ce[contribPubsub.TraceStateField]
assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceID)
assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceState)

return psMock.Publish(ctx, pr)
}
o.getPubsubFn = func(s string) (contribPubsub.PubSub, bool) {
Expand Down Expand Up @@ -480,7 +488,7 @@ func TestSubscribeToInternalTopics(t *testing.T) {
Key: "1",
Value: "hello",
},
}, appID)
}, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01")

if pErr != nil {
errCh <- pErr
Expand Down Expand Up @@ -559,7 +567,7 @@ func TestSubscribeToInternalTopics(t *testing.T) {
Key: "1",
Value: "hello",
},
}, appID)
}, appID, "", "")

assert.Error(t, pErr)
assert.Len(t, trs, 0)
Expand Down Expand Up @@ -634,7 +642,7 @@ func TestSubscribeToInternalTopics(t *testing.T) {
Key: "1",
Value: "hello",
},
}, appID)
}, appID, "", "")

if pErr != nil {
errCh <- pErr
Expand Down Expand Up @@ -759,7 +767,7 @@ func TestSubscribeToInternalTopics(t *testing.T) {
Key: "1",
Value: "hello",
},
}, appID)
}, appID, "", "")

if pErr != nil {
errCh <- pErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/testing/pubsubadapter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3e49978

Please sign in to comment.