Skip to content

Commit

Permalink
feat: Allow to base64 decode messages from Azure Queue Storage (#2627)
Browse files Browse the repository at this point in the history
Signed-off-by: eduardodbr <eduardodbr@hotmail.com>
  • Loading branch information
eduardodbr committed May 27, 2023
1 parent d5a0282 commit bca684a
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 435 deletions.
13 changes: 13 additions & 0 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,10 @@
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "ConnectionString is the connection string to access Azure Queue Storage. If this fields is not provided it will try to access via Azure AD with StorageAccountName."
},
"decodeMessage": {
"description": "DecodeMessage specifies if all the messages should be base64 decoded. If set to true the decoding is done before the evaluation of JSONBody",
"type": "boolean"
},
"dlq": {
"description": "DLQ specifies if a dead-letter queue is configured for messages that can't be processed successfully. If set to true, messages with invalid payload won't be acknowledged to allow to forward them farther to the dead-letter queue. The default value is false.",
"type": "boolean"
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions eventsources/sources/azurequeuestorage/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package azurequeuestorage

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -126,11 +127,23 @@ func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispat
InsertionTime: *message.InsertionTime,
Metadata: el.AzureQueueStorageEventSource.Metadata,
}
body := []byte(*message.MessageText)
if el.AzureQueueStorageEventSource.DecodeMessage {
rawDecodedText, err := base64.RawURLEncoding.DecodeString(*message.MessageText)
if err != nil {
log.Errorw("failed to base64 decode message...", zap.Error(err))
el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
if !el.AzureQueueStorageEventSource.DLQ {
ack()
}
return
}
body = rawDecodedText
}
if el.AzureQueueStorageEventSource.JSONBody {
body := []byte(*message.MessageText)
data.Body = (*json.RawMessage)(&body)
} else {
data.Body = []byte(*message.MessageText)
data.Body = body
}
eventBytes, err := json.Marshal(data)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions examples/event-sources/azure-queue-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ spec:
# jsonBody specifies that all event body payload coming from this
# source will be JSON
jsonBody: true
# DecodeMessage specifies if all the messages from AQS should be base64 decoded
decodeMessage: false
# connection string contains information about K8s secret that stores the connection string
connectionString:
# Key within the K8s secret whose corresponding value (must be base64 encoded) is access key
Expand Down
898 changes: 465 additions & 433 deletions pkg/apis/eventsource/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/apis/eventsource/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/eventsource/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/eventsource/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,10 @@ type AzureQueueStorageEventSource struct {
// Filter
// +optional
Filter *EventSourceFilter `json:"filter,omitempty" protobuf:"bytes,7,opt,name=filter"`
// DecodeMessage specifies if all the messages should be base64 decoded.
// If set to true the decoding is done before the evaluation of JSONBody
// +optional
DecodeMessage bool `json:"decodeMessage,omitempty" protobuf:"bytes,8,opt,name=decodeMessage"`
}

// StripeEventSource describes the event source for stripe webhook notifications
Expand Down

0 comments on commit bca684a

Please sign in to comment.