Skip to content

Commit

Permalink
Support RAW payload in pubsub. (#3143)
Browse files Browse the repository at this point in the history
* Support RAW payload in pubsub.

* Rename TryIsRawPayload to IsRawPayload.

* Rebase with master + contrib update.

* Increase verbosity of errors in pubsub subscriber init.
  • Loading branch information
artursouza committed May 13, 2021
1 parent 5891a3b commit ba377ee
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 164 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ github.com/
**/.project
**/.factorypath
google

test_report_e2e.json
test_report_unit.json
61 changes: 36 additions & 25 deletions pkg/grpc/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/PuerkitoBio/purell"
"github.com/dapr/components-contrib/bindings"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/secretstores"
"github.com/dapr/components-contrib/state"
Expand Down Expand Up @@ -263,47 +264,57 @@ func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequ
return &emptypb.Empty{}, err
}

body := []byte{}

if in.Data != nil {
body = in.Data
rawPayload, metaErr := contrib_metadata.IsRawPayload(in.Metadata)
if metaErr != nil {
err := status.Errorf(codes.InvalidArgument, messages.ErrMetadataGet, metaErr.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

span := diag_utils.SpanFromContext(ctx)
corID := diag.SpanContextToW3CString(span.SpanContext())

envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
ID: a.id,
Topic: in.Topic,
DataContentType: in.DataContentType,
Data: body,
TraceID: corID,
Pubsub: in.PubsubName,
})
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventCreation, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
body := []byte{}
if in.Data != nil {
body = in.Data
}

features := thepubsub.Features()
pubsub.ApplyMetadata(envelope, features, in.Metadata)
data := body

b, err := jsoniter.ConfigFastest.Marshal(envelope)
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventsSer, topic, pubsubName, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
if !rawPayload {
envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
ID: a.id,
Topic: in.Topic,
DataContentType: in.DataContentType,
Data: body,
TraceID: corID,
Pubsub: in.PubsubName,
})
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventCreation, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

features := thepubsub.Features()
pubsub.ApplyMetadata(envelope, features, in.Metadata)

data, err = jsoniter.ConfigFastest.Marshal(envelope)
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventsSer, topic, pubsubName, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}
}

req := pubsub.PublishRequest{
PubsubName: pubsubName,
Topic: topic,
Data: b,
Data: data,
Metadata: in.Metadata,
}

err = a.pubsubAdapter.Publish(&req)
err := a.pubsubAdapter.Publish(&req)
if err != nil {
nerr := status.Errorf(codes.Internal, messages.ErrPubsubPublishMessage, topic, pubsubName, err.Error())
if errors.As(err, &runtime_pubsub.NotAllowedError{}) {
Expand Down
65 changes: 39 additions & 26 deletions pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"

"github.com/dapr/components-contrib/bindings"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/components-contrib/secretstores"
"github.com/dapr/components-contrib/state"
Expand Down Expand Up @@ -1231,48 +1232,60 @@ func (a *api) onPublish(reqCtx *fasthttp.RequestCtx) {
body := reqCtx.PostBody()
contentType := string(reqCtx.Request.Header.Peek("Content-Type"))
metadata := getMetadataFromRequest(reqCtx)
rawPayload, metaErr := contrib_metadata.IsRawPayload(metadata)
if metaErr != nil {
msg := NewErrorResponse("ERR_PUBSUB_REQUEST_METADATA",
fmt.Sprintf(messages.ErrMetadataGet, metaErr.Error()))
respondWithError(reqCtx, fasthttp.StatusBadRequest, msg)
log.Debug(msg)
return
}

// Extract trace context from context.
span := diag_utils.SpanFromContext(reqCtx)
// Populate W3C traceparent to cloudevent envelope
corID := diag.SpanContextToW3CString(span.SpanContext())

envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
ID: a.id,
Topic: topic,
DataContentType: contentType,
Data: body,
TraceID: corID,
Pubsub: pubsubName,
})
if err != nil {
msg := NewErrorResponse("ERR_PUBSUB_CLOUD_EVENTS_SER",
fmt.Sprintf(messages.ErrPubsubCloudEventCreation, err.Error()))
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
log.Debug(msg)
return
}
data := body
if !rawPayload {
envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
ID: a.id,
Topic: topic,
DataContentType: contentType,
Data: body,
TraceID: corID,
Pubsub: pubsubName,
})
if err != nil {
msg := NewErrorResponse("ERR_PUBSUB_CLOUD_EVENTS_SER",
fmt.Sprintf(messages.ErrPubsubCloudEventCreation, err.Error()))
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
log.Debug(msg)
return
}

features := thepubsub.Features()
features := thepubsub.Features()

pubsub.ApplyMetadata(envelope, features, metadata)
b, err := a.json.Marshal(envelope)
if err != nil {
msg := NewErrorResponse("ERR_PUBSUB_CLOUD_EVENTS_SER",
fmt.Sprintf(messages.ErrPubsubCloudEventsSer, topic, pubsubName, err.Error()))
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
log.Debug(msg)
return
pubsub.ApplyMetadata(envelope, features, metadata)

data, err = a.json.Marshal(envelope)
if err != nil {
msg := NewErrorResponse("ERR_PUBSUB_CLOUD_EVENTS_SER",
fmt.Sprintf(messages.ErrPubsubCloudEventsSer, topic, pubsubName, err.Error()))
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
log.Debug(msg)
return
}
}

req := pubsub.PublishRequest{
PubsubName: pubsubName,
Topic: topic,
Data: b,
Data: data,
Metadata: metadata,
}

err = a.pubsubAdapter.Publish(&req)
err := a.pubsubAdapter.Publish(&req)
if err != nil {
status := fasthttp.StatusInternalServerError
msg := NewErrorResponse("ERR_PUBSUB_PUBLISH_MESSAGE",
Expand Down
78 changes: 53 additions & 25 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/contenttype"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/middleware"
nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/components-contrib/pubsub"
Expand Down Expand Up @@ -158,6 +159,13 @@ type componentPreprocessRes struct {
unreadyDependency string
}

type pubsubSubscribedMessage struct {
cloudEvent map[string]interface{}
data []byte
topic string
metadata map[string]string
}

// NewDaprRuntime returns a new runtime with the given runtime config and global config
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration, accessControlList *config.AccessControlList) *DaprRuntime {
return &DaprRuntime{
Expand Down Expand Up @@ -414,7 +422,7 @@ func (a *DaprRuntime) initBinding(c components_v1alpha1.Component) error {
}

func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
var publishFunc pubsub.Handler
var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
switch a.runtimeConfig.ApplicationProtocol {
case HTTPProtocol:
publishFunc = a.publishMessageHTTP
Expand All @@ -438,6 +446,7 @@ func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {

log.Debugf("subscribing to topic=%s on pubsub=%s", topic, name)

routeMetadata := route.metadata
if err := ps.Subscribe(pubsub.SubscribeRequest{
Topic: topic,
Metadata: route.metadata,
Expand All @@ -447,9 +456,38 @@ func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
}

msg.Metadata[pubsubName] = name
return publishFunc(ctx, msg)

rawPayload, err := contrib_metadata.IsRawPayload(routeMetadata)
if err != nil {
log.Errorf("error deserializing pubsub metadata: %s", err)
return err
}

var cloudEvent map[string]interface{}
data := msg.Data
if rawPayload {
cloudEvent = pubsub.FromRawPayload(msg.Data, msg.Topic, name)
data, err = a.json.Marshal(cloudEvent)
if err != nil {
log.Errorf("error serializing cloud event in pubsub %s and topic %s: %s", name, msg.Topic, err)
return err
}
} else {
err = a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.Errorf("error deserializing cloud event in pubsub %s and topic %s: %s", name, msg.Topic, err)
return err
}
}

return publishFunc(ctx, &pubsubSubscribedMessage{
cloudEvent: cloudEvent,
data: data,
topic: msg.Topic,
metadata: msg.Metadata,
})
}); err != nil {
log.Warnf("failed to subscribe to topic %s: %s", topic, err)
log.Errorf("failed to subscribe to topic %s: %s", topic, err)
}
}

Expand Down Expand Up @@ -1120,13 +1158,8 @@ func (a *DaprRuntime) initNameResolution() error {
return nil
}

func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsub.NewMessage) error {
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.Debug(errors.Errorf("failed to deserialize cloudevent: %s", err))
return err
}
func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsubSubscribedMessage) error {
cloudEvent := msg.cloudEvent

if pubsub.HasExpired(cloudEvent) {
log.Warnf("dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
Expand All @@ -1135,15 +1168,15 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsub.NewMes

var span *trace.Span

route := a.topicRoutes[msg.Metadata[pubsubName]].routes[msg.Topic]
route := a.topicRoutes[msg.metadata[pubsubName]].routes[msg.topic]
req := invokev1.NewInvokeMethodRequest(route.path)
req.WithHTTPExtension(nethttp.MethodPost, "")
req.WithRawData(msg.Data, contenttype.CloudEventContentType)
req.WithRawData(msg.data, contenttype.CloudEventContentType)

if cloudEvent[pubsub.TraceIDField] != nil {
traceID := cloudEvent[pubsub.TraceIDField].(string)
sc, _ := diag.SpanContextFromW3CString(traceID)
spanName := fmt.Sprintf("pubsub/%s", msg.Topic)
spanName := fmt.Sprintf("pubsub/%s", msg.topic)
ctx, span = diag.StartInternalCallbackSpan(ctx, spanName, sc, a.globalConfig.Spec.TracingSpec)
}

Expand All @@ -1155,7 +1188,7 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsub.NewMes
statusCode := int(resp.Status().Code)

if span != nil {
m := diag.ConstructSubscriptionSpanAttributes(msg.Topic)
m := diag.ConstructSubscriptionSpanAttributes(msg.topic)
diag.AddAttributesToSpan(span, m)
diag.UpdateSpanStatusFromHTTPStatus(span, statusCode)
span.End()
Expand Down Expand Up @@ -1202,13 +1235,8 @@ func (a *DaprRuntime) publishMessageHTTP(ctx context.Context, msg *pubsub.NewMes
return errors.Errorf("retriable error returned from app while processing pub/sub event %v: %s. status code returned: %v", cloudEvent[pubsub.IDField].(string), body, statusCode)
}

func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error {
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.Debugf("error deserializing cloud events proto: %s", err)
return err
}
func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsubSubscribedMessage) error {
cloudEvent := msg.cloudEvent

if pubsub.HasExpired(cloudEvent) {
log.Warnf("dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
Expand All @@ -1221,15 +1249,15 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMes
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[pubsubName],
Topic: msg.topic,
PubsubName: msg.metadata[pubsubName],
}

if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return err
return decodeErr
}

envelope.Data = decoded
Expand All @@ -1247,7 +1275,7 @@ func (a *DaprRuntime) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMes
if cloudEvent[pubsub.TraceIDField] != nil {
traceID := cloudEvent[pubsub.TraceIDField].(string)
sc, _ := diag.SpanContextFromW3CString(traceID)
spanName := fmt.Sprintf("pubsub/%s", msg.Topic)
spanName := fmt.Sprintf("pubsub/%s", msg.topic)

// no ops if trace is off
ctx, span = diag.StartInternalCallbackSpan(ctx, spanName, sc, a.globalConfig.Spec.TracingSpec)
Expand Down
Loading

0 comments on commit ba377ee

Please sign in to comment.