-
-
Notifications
You must be signed in to change notification settings - Fork 0
Authentication and TLS
Every option for getting KafkaPublisher to authenticate against a real broker — TLS, SASL/PLAIN, SCRAM, OAUTHBEARER, mTLS, dev clusters with self-signed certs.
eventferry treats TLS verification as non-negotiable. There is no rejectUnauthorized: false knob and there never will be — see Dev cluster gotchas for the right pattern.
new KafkaPublisher({
brokers: ["broker:9093"],
ssl: true,
});Uses the driver's default trust store (Node's system CAs on kafkajs, librdkafka's default on confluent). Works against any broker with a cert signed by a public CA.
Client certificate + key for broker-side auth:
import { readFileSync } from "node:fs";
new KafkaPublisher({
brokers: ["broker:9093"],
ssl: {
ca: readFileSync("/etc/ssl/kafka-ca.pem"),
cert: readFileSync("/etc/ssl/client.pem"),
key: readFileSync("/etc/ssl/client-key.pem"),
passphrase: "encrypted-key-password", // optional
servername: "broker.example.com", // SNI override, optional
},
});| Field | What |
|---|---|
ca |
PEM-encoded CA bundle. String, Buffer, or array of either. |
cert |
Client cert PEM. Required for mTLS. |
key |
Client private key PEM. Required for mTLS. |
passphrase |
Password for an encrypted private key. |
servername |
SNI host override. Honored on kafkajs; documented no-op on confluent. |
Plain-text password over TLS. Used by Aiven, Confluent Cloud (legacy), self-managed clusters:
new KafkaPublisher({
brokers,
ssl: true,
sasl: {
mechanism: "plain",
username: "<api-key>",
password: "<api-secret>",
},
});Stronger hash, same surface:
new KafkaPublisher({
brokers,
ssl: true,
sasl: {
mechanism: "scram-sha-512",
username,
password,
},
});Both mechanisms work on both drivers. The publisher maps them to the underlying client's native SASL config — no translation surprises.
The bring-your-own-token mechanism. eventferry calls your provider when the client needs a token:
new KafkaPublisher({
brokers,
ssl: true,
sasl: {
mechanism: "oauthbearer",
oauthBearerProvider: async () => ({
value: await getJwtToken(),
principal: "service-name", // REQUIRED on confluent, ignored on kafkajs
lifetime: 3600_000, // REQUIRED on confluent (ms), ignored on kafkajs
extensions: { scope: "kafka.write" }, // optional OIDC extensions
}),
},
});| Field | kafkajs | confluent |
|---|---|---|
value |
required | required |
principal |
ignored | required (driver throws without it) |
lifetime |
ignored | required, milliseconds |
extensions |
ignored | optional |
For cross-driver portable code, always populate all four. Setting them on kafkajs is a no-op there.
The driver calls oauthBearerProvider on demand, not on a timer. If you don't cache, you'll re-issue a token on every reconnect / partition leader change. Wrap the call:
let cached: { value: string; expiry: number } | null = null;
const oauthBearerProvider = async () => {
if (cached && cached.expiry - Date.now() > 60_000) return cached;
const token = await issueFreshToken();
cached = { value: token, expiry: parseExpiryFromJwt(token) };
return { value: cached.value, principal: "...", lifetime: cached.expiry - Date.now() };
};For AWS MSK IAM, eventferry ships a turnkey version → AWS MSK IAM.
eventferry derives the security protocol automatically from what you set:
| Set | Computed security.protocol
|
|---|---|
ssl: undefined + no sasl
|
plaintext |
ssl: true/{} + no sasl
|
ssl |
no ssl + sasl
|
sasl_plaintext |
ssl: true/{} + sasl
|
sasl_ssl |
You never set security.protocol directly. If you do, rawProducerConfig's override wins — but you shouldn't need it.
The right pattern is to pin your CA. Verification still happens — just against your CA instead of the system trust store.
new KafkaPublisher({
brokers: ["dev-broker.internal:9093"],
ssl: {
ca: readFileSync("/path/to/dev-cluster-ca.pem"),
servername: "kafka.dev.internal", // when DNS doesn't match cert SAN
},
});Never add rejectUnauthorized: false (TypeScript would reject it anyway — it's not in the type). That disables verification entirely and opens every connection to a man-in-the-middle.
When the broker address is an IP and the cert was issued for a hostname, set servername:
new KafkaPublisher({
brokers: ["10.0.5.12:9093"], // IP literal
ssl: {
ca: readFileSync("/etc/ssl/kafka-ca.pem"),
servername: "broker.example.com", // hostname the cert was issued for
},
});servername is honored by the kafkajs driver (Node's tls.connect reads it directly). It's a documented no-op on the confluent driver — librdkafka v1.x's kafkaJS-compat layer doesn't expose an SNI override, and SNI is derived from the broker address. Use the kafkajs driver when you need the SNI lever.
Not currently shipped. It's an enterprise on-prem-only mechanism that needs librdkafka linked against a Kerberos client (cyrus-sasl-gssapi). If you need it, file an issue with your deployment shape — the confluent driver supports it natively, so the work is exposing the right config.
- AWS-specific MSK IAM helper → AWS MSK IAM
- Producer tuning + driver choice → Kafka Publisher
- Production deployment checklist → Operations Guide
Repository · Issues · npm: @eventferry/all · MIT
Get going
Adapters
Type & schema
Security
Operational
- Transactions and EOS
- Admin Operations
- Observability
- Consuming Events
- Dead-Letter Queue
- Reliability and Error Handling
Operations
Reference