Skip to content

Commit

Permalink
chore: v3 validate message size (#68)
Browse files Browse the repository at this point in the history
* chore: validate message size

* chore: change to byteSize

* chore: adding unit test for message max size
  • Loading branch information
arifinab committed Jan 8, 2024
1 parent 580eedb commit 289ba20
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 4 deletions.
31 changes: 27 additions & 4 deletions v3/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ const (
auditLogTopicEnvKey = "APP_EVENT_STREAM_AUDIT_LOG_TOPIC"
auditLogEnableEnvKey = "APP_EVENT_STREAM_AUDIT_LOG_ENABLED"
auditLogTopicDefault = "auditLog"

messageAdditionalSizeApprox = 2048 // in Byte. Approx data added to message that sent to kafka
)

var (
auditLogTopic = ""
auditEnabled = true
errPubNilEvent = errors.New("unable to publish nil event")
errSubNilEvent = errors.New("unable to subscribe nil event")
auditLogTopic = ""
auditEnabled = true
errPubNilEvent = errors.New("unable to publish nil event")
errSubNilEvent = errors.New("unable to subscribe nil event")
errMessageTooLarge = errors.New("message to large")
)

// KafkaClient wraps client's functionality for Kafka
Expand Down Expand Up @@ -219,6 +222,10 @@ func (client *KafkaClient) Publish(publishBuilder *PublishBuilder) error {
return fmt.Errorf("unable to construct event : %s , error : %v", publishBuilder.eventName, err)
}

if err = client.validateMessageSize(&message); err != nil {
return err
}

config := client.publishConfig

if len(publishBuilder.topic) > 1 {
Expand Down Expand Up @@ -295,6 +302,10 @@ func (client *KafkaClient) PublishSync(publishBuilder *PublishBuilder) error {
return fmt.Errorf("unable to construct event : %s , error : %v", publishBuilder.eventName, err)
}

if err = client.validateMessageSize(&message); err != nil {
return err
}

config := client.publishConfig
if len(publishBuilder.topic) != 1 {
return fmt.Errorf("incorrect number of topics for sync publish")
Expand All @@ -305,6 +316,18 @@ func (client *KafkaClient) PublishSync(publishBuilder *PublishBuilder) error {
return client.publishEvent(publishBuilder.ctx, topic, publishBuilder.eventName, config, message)
}

func (client *KafkaClient) validateMessageSize(msg *kafka.Message) error {
maxSize := client.publishConfig.BatchBytes
if maxSize <= 0 {
maxSize = 1048576 // default size from kafka in bytes
}
maxSize -= messageAdditionalSizeApprox
if len(msg.Key)+len(msg.Value) > maxSize {
return errMessageTooLarge
}
return nil
}

// Publish send event to a topic
func (client *KafkaClient) publishEvent(ctx context.Context, topic, eventName string, config kafka.WriterConfig,
message kafka.Message) (err error) {
Expand Down
168 changes: 168 additions & 0 deletions v3/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package eventstream

import (
"context"
"math/rand"
"testing"
"time"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -329,3 +332,168 @@ func TestKafkaSubNilCallback(t *testing.T) {

assert.Equal(t, errInvalidCallback, err, "error should be equal")
}

var seededRand *rand.Rand = rand.New(
rand.NewSource(time.Now().UnixNano()))

func randomString(length int) string {
charset := "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}

func makePayload(keyLength, messageLength int) map[string]interface{} {
ret := make(map[string]interface{})
for i := 0; i < keyLength; i++ {
ret[randomString(32)] = randomString(messageLength)
}
return ret
}

func TestKafkaMaxMessageSize(t *testing.T) {
t.Parallel()
client := createKafkaClient(t)
topicName := constructTopicTest()

testCases := []struct {
Payload map[string]interface{}
Err error
}{
{Payload: makePayload(10, 1000), Err: nil},
{Payload: makePayload(2000, 1000), Err: errMessageTooLarge},
}

for _, testCase := range testCases {
var mockPayload = testCase.Payload

mockAdditionalFields := map[string]interface{}{
"summary": "user:_failed",
}

mockEvent := &Event{
EventName: "testEvent",
Namespace: "event",
ClientID: "661a4ac82b854f3ca3ac2e0377d356e4",
TraceID: "5005e27d01064f23b962e8fd2e560a8a",
SpanContext: "test-span-context",
UserID: "661a4ac82b854f3ca3ac2e0377d356e4",
EventID: 3,
EventType: 301,
EventLevel: 3,
ServiceName: "test",
ClientIDs: []string{"7d480ce0e8624b02901bd80d9ba9817c"},
TargetUserIDs: []string{"1fe7f425a0e049d29d87ca3d32e45b5a"},
TargetNamespace: "publisher",
Privacy: true,
AdditionalFields: mockAdditionalFields,
Version: 2,
Payload: mockPayload,
}

err := client.Publish(
NewPublish().
Topic(topicName).
EventName(mockEvent.EventName).
Namespace(mockEvent.Namespace).
ClientID(mockEvent.ClientID).
UserID(mockEvent.UserID).
SessionID(mockEvent.SessionID).
TraceID(mockEvent.TraceID).
SpanContext(mockEvent.SpanContext).
EventID(mockEvent.EventID).
EventType(mockEvent.EventType).
EventLevel(mockEvent.EventLevel).
ServiceName(mockEvent.ServiceName).
ClientIDs(mockEvent.ClientIDs).
TargetUserIDs(mockEvent.TargetUserIDs).
TargetNamespace(mockEvent.TargetNamespace).
Privacy(mockEvent.Privacy).
AdditionalFields(mockEvent.AdditionalFields).
Version(2).
Context(context.Background()).
Payload(mockPayload))

assert.Equal(t, testCase.Err, err)
}
}

func TestKafkaMaxMessageSizeModified(t *testing.T) {
t.Parallel()

config := &BrokerConfig{
CACertFile: "",
StrictValidation: true,
DialTimeout: 2 * time.Second,
BaseWriterConfig: &kafka.WriterConfig{
BatchBytes: 4096,
},
}

brokerList := []string{"localhost:9092"}
client, _ := NewClient(prefix, eventStreamKafka, brokerList, config)
topicName := constructTopicTest()

testCases := []struct {
Payload map[string]interface{}
Err error
}{
{Payload: makePayload(1, 1000), Err: nil},
{Payload: makePayload(10, 1000), Err: errMessageTooLarge},
}

for _, testCase := range testCases {
var mockPayload = testCase.Payload

mockAdditionalFields := map[string]interface{}{
"summary": "user:_failed",
}

mockEvent := &Event{
EventName: "testEvent",
Namespace: "event",
ClientID: "661a4ac82b854f3ca3ac2e0377d356e4",
TraceID: "5005e27d01064f23b962e8fd2e560a8a",
SpanContext: "test-span-context",
UserID: "661a4ac82b854f3ca3ac2e0377d356e4",
EventID: 3,
EventType: 301,
EventLevel: 3,
ServiceName: "test",
ClientIDs: []string{"7d480ce0e8624b02901bd80d9ba9817c"},
TargetUserIDs: []string{"1fe7f425a0e049d29d87ca3d32e45b5a"},
TargetNamespace: "publisher",
Privacy: true,
AdditionalFields: mockAdditionalFields,
Version: 2,
Payload: mockPayload,
}

err := client.Publish(
NewPublish().
Topic(topicName).
EventName(mockEvent.EventName).
Namespace(mockEvent.Namespace).
ClientID(mockEvent.ClientID).
UserID(mockEvent.UserID).
SessionID(mockEvent.SessionID).
TraceID(mockEvent.TraceID).
SpanContext(mockEvent.SpanContext).
EventID(mockEvent.EventID).
EventType(mockEvent.EventType).
EventLevel(mockEvent.EventLevel).
ServiceName(mockEvent.ServiceName).
ClientIDs(mockEvent.ClientIDs).
TargetUserIDs(mockEvent.TargetUserIDs).
TargetNamespace(mockEvent.TargetNamespace).
Privacy(mockEvent.Privacy).
AdditionalFields(mockEvent.AdditionalFields).
Version(2).
Context(context.Background()).
Payload(mockPayload))
assert.Equal(t, testCase.Err, err)
}
}

0 comments on commit 289ba20

Please sign in to comment.