Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

App crashes the first time it tries to subscribe to a kafka topic #426

Closed
demetris-manikas opened this issue May 7, 2024 · 18 comments
Closed

Comments

@demetris-manikas
Copy link
Contributor

demetris-manikas commented May 7, 2024

After adding the kafka integration to dbos-docker-boilerplate

When the app starts with newly created volumes (docker compose down -v; docker compose up )
It encounters the following error

This server does not host this topic-partition 

dbos-app  |     at createErrorFromCode (/home/node/app/node_modules/kafkajs/src/protocol/error.js:581:10)
...
...

Debugging showed that the error is thrown at kafka.ts#L88 the first time it tries to subscribe
and the app exits runtime.ts#L54

Running docker compose down; docker compose up works fine after the first time.

Setting the kafka param AUTO_CREATE_TOPIC=true did not change anything.

What did was adding this hack(?)

await kafka.admin().createTopics({
    topics: [
        {topic: ro.kafkaTopic}
    ]);

before initiating the consumer in kafka.ts

I am no expert configuring kafka so it might just be a misconfiguration on my part.

@kraftp
Copy link
Contributor

kraftp commented May 7, 2024

This looks like a Kafka configuration issue. Subscribing to a nonexistent topic is supposed to fail in Kafka by default. However, like you said, you should be able to change that default behavior by enabling topic auto-creation. Can you make sure that the variable is set correctly? Depending on what Kafka Docker image you're using, it may have a different name.

@demetris-manikas
Copy link
Contributor Author

I 'll try some more then and report back. Thanks

@demetris-manikas
Copy link
Contributor Author

demetris-manikas commented May 7, 2024

Well no luck...

dbos-app  | 2024-05-07 20:05:40 [info]: DBOS Server is running at http://localhost:3000 
dbos-app  | 2024-05-07 20:05:40 [info]: DBOS Admin Server is running at http://localhost:3001 
broker    | [2024-05-07 20:05:40,116] INFO Sent auto-creation request for Set(test-topic) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
dbos-app  | {"level":"ERROR","timestamp":"2024-05-07T20:05:40.119Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"broker:9092","clientId":"dbos-app","error":"This server does not host this topic-partition","correlationId":1,"size":101}
dbos-app  | 2024-05-07 20:05:40 [error]: This server does not host this topic-partition 
dbos-app  |     at createErrorFromCode (/home/node/app/node_modules/kafkajs/src/protocol/error.js:581:10)
dbos-app  |     at Object.parse (/home/node/app/node_modules/kafkajs/src/protocol/requests/metadata/v0/response.js:55:11)
dbos-app  |     at Connection.send (/home/node/app/node_modules/kafkajs/src/network/connection.js:433:35)
dbos-app  |     at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
dbos-app  |     at async [private:Broker:sendRequest] (/home/node/app/node_modules/kafkajs/src/broker/index.js:904:14)
dbos-app  |     at async Broker.metadata (/home/node/app/node_modules/kafkajs/src/broker/index.js:177:12)
dbos-app  |     at async /home/node/app/node_modules/kafkajs/src/cluster/brokerPool.js:158:25
dbos-app  |     at async /home/node/app/node_modules/kafkajs/src/cluster/index.js:111:14
dbos-app  |     at async Cluster.refreshMetadata (/home/node/app/node_modules/kafkajs/src/cluster/index.js:172:5)
dbos-app  |     at async Cluster.addMultipleTargetTopics (/home/node/app/node_modules/kafkajs/src/cluster/index.js:230:11)
dbos-app  | [nodemon] app crashed
broker    | [2024-05-07 20:05:40,149] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test-topic-0) (kafka.server.ReplicaFetcherManager)
broker    | [2024-05-07 20:05:40,156] INFO [LogLoader partition=test-topic-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
broker    | [2024-05-07 20:05:40,158] INFO Created log for partition test-topic-0 in /tmp/kraft-combined-logs/test-topic-0 with properties {} (kafka.log.LogManager)
broker    | [2024-05-07 20:05:40,159] INFO [Partition test-topic-0 broker=1] No checkpointed highwatermark is found for partition test-topic-0 (kafka.cluster.Partition)
broker    | [2024-05-07 20:05:40,160] INFO [Partition test-topic-0 broker=1] Log loaded for partition test-topic-0 with initial high watermark 0 (kafka.cluster.Partition)

As you can see from the logs the request is made but asynchronously.

Anyway as a workaround I solved it by setting a restart:unless-stopped to my app container and changing nodemon to nodemon --exitcrash so at least I can have it running .

If you could provide me a working combination of kafka image and config (like the one you use to test) I will be able to investigate this further. Otherwise go ahead and close this. :)

Thanks for your time

@kraftp
Copy link
Contributor

kraftp commented May 7, 2024

Here's the docker-compose.yml that I use for testing. I can't get it to reproduce the problem, which makes me suspect it's a Kafka configuration issue.

version: "3.7"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

Here's a tiny app I use for testing:

import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Workflow, WorkflowContext, Kafka, KafkaConsume } from '@dbos-inc/dbos-sdk';

const kafkaConfig: KafkaConfig = {
    brokers: [`localhost:9092`]
}

@Kafka(kafkaConfig)
export class KafkaExample {

  @KafkaConsume("dbos-topic")
  @Workflow()
  static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
    ctxt.logger.info(`Message received: ${message.value?.toString()}`)
  }
}

@demetris-manikas
Copy link
Contributor Author

demetris-manikas commented May 7, 2024

Thanks. I had an app already.
Everything runs smoothly with your setup.
Thanks a lot for sharing.

Just for reference here is my config that produced the error mentioned.

  broker:
    image: apache/kafka:latest
    container_name: broker
    user: root
    ports:
      - '9092:9092'
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://broker:9092,PLAINTEXT://broker:19092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: 'd86da882-0c82-11ef-8bb9-a757193d3343'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka_data:/tmp/kraft-combined-logs
    healthcheck:
      test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092 || exit 1"]
      interval: 2s
      timeout: 12s
      retries: 4
      start_period: 2s

I tried the latest version and probably it all starts there. I will live with the zookeeper for a while...

@demetris-manikas
Copy link
Contributor Author

demetris-manikas commented May 8, 2024

After some more digging this is what I found.
Auto creation of topics refers to creating a topic when a producer publishes to a non-existent topic.
In general it is considered a bad practice since there is not enough control on the number of partitions/replicas.
(the automatically created ones get the brokers default settings, so no granularity there) .
The good practice being to set AUTO_CREATE_TOPICS to 'false' and create the topics declaratively.

I believe that a better approach is to let the developer declare the topics in some config and the app on init, before starting any consumers, should check the existence of the declared topics and create the missing ones.

This can be easily achieved using kafka.admin.listTopics(); and kafka.admin().createTopics();

@demetris-manikas
Copy link
Contributor Author

After setting AUTO_CREATE_TOPICS to false the app keeps crashing in a loop.
Which means that if broker has this setting a single typo (in the name of the topic) will crash the app over and over.

@kraftp kraftp reopened this May 8, 2024
@kraftp
Copy link
Contributor

kraftp commented May 8, 2024

That's a good point! We can automatically create all decorated topics during startup if they don't already exist.

@demetris-manikas
Copy link
Contributor Author

If it is going to be implemented this way (reading from the decorators instead of a config) all the topic creation parameters should be available at the decorator level (maybe they are there and I missed them).

As you have probably figured out already I have some time to spare these days so if you wish I could create a PR on this.

@demetris-manikas
Copy link
Contributor Author

I have to say that I would prefer the file based configuration solution.
This way one can have a clear view of the whole configuration in one place instead of it being scattered in random files.
And maybe some linting rule could error when the name in the decorator is not listed in the config (nice to have but not really important).

@kraftp
Copy link
Contributor

kraftp commented May 8, 2024

On second thought, I want to hold off on this for now. I think that:

  • In a development environment, topic auto-creation is fine.
  • In a production environment, Kafka is likely configured independently of DBOS (for example, topics are likely created on the producer side or in IaC), and DBOS shouldn't also try to configure it.

DBOS works excellently as a Kafka consumer, but right now I don't think we should do Kafka cluster management.

@kraftp kraftp closed this as completed May 8, 2024
@demetris-manikas
Copy link
Contributor Author

Great point there. I was thinking about it too.

@demetris-manikas
Copy link
Contributor Author

I just found out that there is an open PR at kafkajs that when merged will fix the issue that started this whole conversation.
I tested it locally and seems to do the job.
Wouldn't have bothered you had I found about it earlier.
Sorry.

@kraftp
Copy link
Contributor

kraftp commented May 8, 2024

Nice find!

@demetris-manikas
Copy link
Contributor Author

demetris-manikas commented May 8, 2024

Actually is more important tulios / kafkajs / issues / 1603 .
Seems that kafkajs will not be getting any updates soon

@demetris-manikas
Copy link
Contributor Author

docker-compose.txt
As requested.
Github does not support .yaml (that's the .txt extension for)
Rename to .yaml;
docker compose up -d;
In dbos-transact; npm run build; npx jest kafka.test;

You should see this. KafkaJSProtocolError: This server does not host this topic-partition

@kraftp
Copy link
Contributor

kraftp commented May 9, 2024

Workaround for this particular issue until KafkaJS solves it: #434

@kraftp kraftp reopened this May 9, 2024
@kraftp kraftp mentioned this issue May 9, 2024
@kraftp
Copy link
Contributor

kraftp commented May 9, 2024

Addressed in #434 until KafkaJS fixes the bug

@kraftp kraftp closed this as completed May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants