-
-
Notifications
You must be signed in to change notification settings - Fork 0
Admin Operations
KafkaPublisher exposes a typed admin surface for listing, describing, and provisioning topics — no need to install a second client just to bootstrap a topic in CI or a test.
The admin surface is deliberately a subset of the full Kafka AdminClient. ACL management, quota inspection, and consumer-group operations are intentionally out of scope; reach for the raw driver admin when you need them.
Borrow a connected admin client:
const admin = await publisher.admin();
try {
const topics = await admin.listTopics();
const desc = await admin.describeTopics(["orders.created"]);
console.log(desc[0].partitions.length);
} finally {
await admin.close();
}The returned KafkaAdmin is already connected; the caller owns the lifecycle and must close() it.
Methods on KafkaAdmin:
| Method | Returns | Notes |
|---|---|---|
listTopics() |
string[] |
All topic names visible to this principal, including internal topics. |
describeTopics(topics) |
TopicMetadata[] |
Partition / leader / ISR per topic. Missing topics return partitions: [] — no try/catch needed. |
createTopics(specs) |
void |
Idempotent. Existing topics are silently skipped (TOPIC_ALREADY_EXISTS is swallowed). |
createPartitions(specs) |
void |
Grow a topic's partition count. Kafka does not support shrinking. |
close() |
void |
Disconnect. |
Idempotent provisioning on top of the admin surface:
await publisher.ensureTopics([
{ topic: "orders.created", numPartitions: 12, replicationFactor: 3 },
{ topic: "orders.created.dlq", numPartitions: 3, replicationFactor: 3,
configEntries: { "retention.ms": "604800000" } },
]);What it does:
- Creates topics that don't exist.
- Skips topics that already exist — no error, no surprise alter.
What it does NOT do (by design):
- Reconcile replication factor on existing topics — Kafka has no safe in-place alter (replication changes require partition reassignment).
-
Reconcile
configEntrieson existing topics — usekafka-configs.shor the raw admin client'salterConfigsif you need that.
Use { growPartitions: true } to also expand existing topics whose partition count is below the requested numPartitions:
await publisher.ensureTopics(
[{ topic: "orders.created", numPartitions: 24 }],
{ growPartitions: true },
);Warning: growing partitions changes the key-hash distribution. A record that previously landed on partition 3 may now land on partition 17. If your consumers rely on strict per-key ordering and have committed offsets across the old partition layout, growing partitions mid-flight breaks that ordering. Plan partition growth as a planned migration, not a hot-path operation.
Fail-fast at startup if expected topics are missing:
new KafkaPublisher({
brokers,
validateTopicsOnConnect: ["orders.created", "orders.created.dlq", "events"],
});connect() opens an admin, runs listTopics, and throws a single descriptive error naming every missing topic. The admin is always closed (success or failure).
Empty list or omitted option skips the check entirely.
Why this matters: Managed Kafka (Confluent Cloud, MSK Serverless) often runs with auto.create.topics.enable=false. Without validateTopicsOnConnect, your service connects happily, sits idle until the first publish, then fails per-record with "topic doesn't exist." Catching it at startup means your /readyz fails immediately and your deploy never goes green.
const publisher = new KafkaPublisher({ brokers, /* ... */ });
await publisher.connect();
if (process.env.NODE_ENV !== "production") {
// In dev / CI / tests, provision the topics we need.
await publisher.ensureTopics([
{ topic: "orders.created", numPartitions: 12, replicationFactor: 1 },
{ topic: "orders.created.dlq", numPartitions: 3, replicationFactor: 1 },
]);
} else {
// In prod, just fail fast if the operator hasn't provisioned them.
// (validateTopicsOnConnect ran inside .connect() already)
}In production, topics are infrastructure, provisioned by Terraform / Confluent Cloud UI / kafka-tooling — not by the application. eventferry's ensureTopics is for tests and bootstrapped dev environments.
Custom drivers that don't implement the optional admin?() method get a clear error from publisher.admin():
KafkaPublisher: configured driver does not implement admin().
Use the built-in kafkajs or confluent driver, or extend your custom driver.
publisher.healthCheck() and validateTopicsOnConnect similarly degrade — health returns { ok: false, error }, validate-on-connect skips with a warning.
- Health checks → Observability
- Production bootstrap checklist → Operations Guide
- Configure the publisher → Kafka Publisher
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference