Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] kafka-http-connector: same message processed by different pod instances #50

Open
danyc opened this issue Jan 28, 2021 · 0 comments
Assignees
Labels
bug Something isn't working connector-kafka

Comments

@danyc
Copy link

danyc commented Jan 28, 2021

Using fission

client:
  fission/core:
    BuildDate: "2020-10-20T14:43:45Z"
    GitCommit: 52508e19343ef93da4ce8960908c5569f44da225
    Version: 1.11.2
server:
  fission/core:
    BuildDate: "2020-10-20T14:43:45Z"
    GitCommit: 52508e19343ef93da4ce8960908c5569f44da225
    Version: 1.11.2

with keda

  mqt_keda:
    enabled: true
    connector_images:
      kafka:
        image: fission/keda-kafka-http-connector
        tag: 1

I have a MessageQueueTrigger defined:

apiVersion: fission.io/v1
kind: MessageQueueTrigger
metadata:
  creationTimestamp: null
  name: ...-test
  namespace: default
spec:
  contentType: application/json
  cooldownPeriod: 60
  errorTopic: ....errors.v1
  functionref:
    functionweights: null
    name: ...-test
    type: name
  maxReplicaCount: 1
  maxRetries: 3
  messageQueueType: kafka
  metadata:
    bootstrapServers: my-kafka:9092
    consumerGroup: ...-test
    topic: ....requests.v1
    lagThreshold: 100
  minReplicaCount: 0
  mqtkind: keda
  pollingInterval: 30
  respTopic: ....output.v1
  topic: ....requests.v1

Scenario: publish 200 messages on input topic and check keda scaling.

Actual results: when lag > 100, a new consumer pod was started, but some of the messages were processed by both consumers (screenshot attached)
Screenshot from 2021-01-28 14-12-36

Expected: the consumer should handle kafka re-balancing and ensure single message processing.

Found this issue reported on sarama library: IBM/sarama#1516 and one of the comments mentions:

The above is a symptom of not looping on Consume(). Consume() will exit without error when a rebalancing occurs and it is up to the user to call it again when this occurs.

Under the hood it seems like when a re-balance occurs all sessions are torn down completely (briefly no members exist and therefore no partitions are handled by anyone) and when you re-call Consume() a new session is brought up which should get its share of the partitions.

I'm not familiar with Go. Please check the code and asses if there is room for improvement regarding the consumer handling.
Additionally, maybe update sarama library to latest version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working connector-kafka
Projects
None yet
Development

No branches or pull requests

3 participants