Skip to content

Commit

Permalink
Merge pull request #63 from AccelByte/allow-empty-event-name
Browse files Browse the repository at this point in the history
chore: allow empty event name when subscribe
  • Loading branch information
arifinab committed Nov 15, 2023
2 parents a283d8f + 8913838 commit e3222d0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
25 changes: 19 additions & 6 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type KafkaClient struct {

// mutex to avoid runtime races to access writers map
WritersLock sync.RWMutex

// current topic subscribed on the kafka client
topicSubscribedCount map[string]int
}

// setConfig sets some defaults for producers and consumers. Needed for backwards compatibility.
Expand Down Expand Up @@ -170,12 +173,13 @@ func newKafkaClient(brokers []string, prefix string, configList ...*BrokerConfig
config, err := setConfig(configList, brokers)

client := &KafkaClient{
prefix: prefix,
strictValidation: config.StrictValidation,
publishConfig: *config.BaseWriterConfig,
subscribeConfig: *config.BaseReaderConfig,
readers: make(map[string]*kafka.Reader),
writers: make(map[string]*kafka.Writer),
prefix: prefix,
strictValidation: config.StrictValidation,
publishConfig: *config.BaseWriterConfig,
subscribeConfig: *config.BaseReaderConfig,
readers: make(map[string]*kafka.Reader),
writers: make(map[string]*kafka.Writer),
topicSubscribedCount: make(map[string]int),
}
if config.MetricsRegistry != nil {
err = config.MetricsRegistry.Register(&kafkaprometheus.WriterCollector{Client: client})
Expand Down Expand Up @@ -446,6 +450,10 @@ func (client *KafkaClient) unregister(subscribeBuilder *SubscribeBuilder) {
client.ReadersLock.Lock()
defer client.ReadersLock.Unlock()
delete(client.readers, subscribeBuilder.Slug())
currentSubscribeCount := client.topicSubscribedCount[subscribeBuilder.topic]
if currentSubscribeCount > 0 {
client.topicSubscribedCount[subscribeBuilder.topic] = currentSubscribeCount - 1
}
}

// Register register callback function and then subscribe topic
Expand Down Expand Up @@ -602,6 +610,11 @@ func (client *KafkaClient) registerSubscriber(subscribeBuilder *SubscribeBuilder
logrus.Warnf("multiple subscribers for %+v", subscribeBuilder)
}
}
currentSubscribeCount := client.topicSubscribedCount[subscribeBuilder.topic]
if currentSubscribeCount > 0 {
logrus.WithField("topic", subscribeBuilder.topic).Warn("multiple subscribe for a topic")
}
client.topicSubscribedCount[subscribeBuilder.topic] = currentSubscribeCount + 1

client.readers[slug] = nil // It's registered. Later we set the actual value to the kafka.Writer.

Expand Down
16 changes: 9 additions & 7 deletions validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,19 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {
return errInvalidTopicFormat
}

if isEventNameValid := validateTopicEvent(subscribeBuilder.eventName); !isEventNameValid {
logrus.
WithField("Topic Name", subscribeBuilder.topic).
WithField("Event Name", subscribeBuilder.eventName).
Errorf("unable to validate subscribe event. error: invalid event name format")
return errInvalidEventNameFormat
if subscribeBuilder.eventName != "" {
if isEventNameValid := validateTopicEvent(subscribeBuilder.eventName); !isEventNameValid {
logrus.
WithField("Topic Name", subscribeBuilder.topic).
WithField("Event Name", subscribeBuilder.eventName).
Errorf("unable to validate subscribe event. error: invalid event name format")
return errInvalidEventNameFormat
}
}

subscribeEvent := struct {
Topic string `valid:"required"`
EventName string `valid:"required"`
EventName string
GroupID string
Callback func(ctx context.Context, event *Event, err error) error
CallbackRaw func(ctx context.Context, msg []byte, err error) error
Expand Down
8 changes: 8 additions & 0 deletions validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ func TestValidateSubscriberEvent(t *testing.T) {
},
expected: false,
},
{
input: &SubscribeBuilder{
topic: "accelbyte.dev.topic-123",
eventName: "",
callback: callbackFunc,
},
expected: true,
},
}

for _, testCase := range testCases {
Expand Down

0 comments on commit e3222d0

Please sign in to comment.