Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ You can add breakpoints and so on after that.

## Updating librdkafka version

The librdkafka should be periodically updated to the latest release in https://github.com/edenhill/librdkafka/releases
The librdkafka should be periodically updated to the latest release in https://github.com/confluentinc/librdkafka/releases

Steps to update:
1. Update the `librdkafka` property in [`package.json`](https://github.com/confluentinc/confluent-kafka-js/blob/master/package.json) to the desired version.
Expand Down
1 change: 1 addition & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
The MIT License (MIT)
Copyright (c) 2016-2023 Blizzard Entertainment
2023 Confluent, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
64 changes: 64 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Migration Guide

## KafkaJS

### Common

1. Error Handling: Some possible subtypes of `KafkaJSError` have been removed,
and additional information has been added into `KafkaJSError`.
Internally, fields have been added denoting if the error is fatal, retriable, or abortable (the latter two only relevant for a
transactional producer).
Some error-specific fields have also been removed. An exhaustive list is at the bottom of this section.

For compability, as many error types as possible have been retained, but it is
better to switch to checking the `error.code`.

**Action**: Convert any checks based on `instanceof` and `error.name` or to error
checks based on `error.code` or `error.type`.

**Example:**:
```js
try {
await producer.send(/* args */);
} catch (error) {
if (!Kafka.isKafkaJSError(error)) { /* unrelated err handling */ }
else if (error.fatal) { /* fatal error, abandon producer */ }
else if (error.code === Kafka.ErrorCode.ERR__QUEUE_FULL) { /*...*/ }
else if (error.type === 'ERR_MSG_SIZE_TOO_LARGE') { /*...*/ }
/* and so on for specific errors */
}
```

Exhaustive list of error types and error fields removed:
1. `KafkaJSNonRetriableError`: retriable errors are automatically retried by librdkafka, so there's no need for this type.
Note that `error.retriable` still exists, but it's applicable only for transactional producer,
where users are expected to retry an action themselves.
All error types using this as a superclass now use `KafkaJSError` as their superclass.
2. `topic` and `partition` are removed from `KafkaJSOffsetOutOfRange`.
3. `KafkaJSMemberIdRequired`: removed as automatically handled by librdkafka.
4. `KafkaJSNumberOfRetriesExceeded`: removed as retries are handled by librdkafka.
5. `broker, correlationId, createdAt, sentAt` and `pendingDuration` are removed from `KafkaJSNumberOfRetriesExceeded`.
6. `KafkaJSMetadataNotLoaded`: removed as metadata is automatically reloaded by librdkafka.
7. `KafkaJSTopicMetadataNotLoaded`: removed as topic metadata is automatically reloaded by librdkafka.
8. `KafkaJSStaleTopicMetadataAssignment`: removed as it's automatically refreshed by librdkafka.
9. `KafkaJSDeleteGroupsError`: removed, as the Admin Client doesn't have this yet. May be added back again, or changed.
10. `KafkaJSServerDoesNotSupportApiKey`: removed, as this error isn't generally exposed to user in librdkafka. If raised,
it is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`.
11. `KafkaJSBrokerNotFound`: removed, as this error isn't exposed directly to the user in librdkafka.
12. `KafkaJSLockTimeout`: removed, as such an error is not applicable while using librdkafka.
13. `KafkaJSUnsupportedMagicByteInMessageSet`: removed. It is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`.
14. `KafkaJSDeleteTopicRecordsError`: removed, as the Admin Client doesn't have this yet. May be added back again, or changed.
15. `KafkaJSInvariantViolation`: removed, as it's not applicable to librdkafka. Errors in internal state are subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR__STATE`.
16. `KafkaJSInvalidVarIntError`: removed, as it's not exposed to the user in librdkafka.
17. `KafkaJSInvalidLongError`: removed, as it's not exposed to the user in librdkafka.
18. `KafkaJSCreateTopicError`: removed, as the Admin Client doesn't have this yet. May be added back again, or changed.
19. `KafkaJSAlterPartitionReassignmentsError`: removed, as the RPC is not used in librdkafka.
20. `KafkaJSFetcherRebalanceError`: removed, it's not exposed to the user in librdkafka.
21. `broker` is removed from `KafkaJSConnectionError`.
22. `KafkaJSConnectionClosedError`: removed, and subsumed into `KafkaJSConnectionError` as librdkafka treats them equivalently.

### Producer

### Consumer

## node-rdkafka
52 changes: 51 additions & 1 deletion lib/kafkajs/_common.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const error = require("./_error");
const LibrdKafkaError = require('../error');

/**
* @function kafkaJSToRdKafkaConfig()
* @param {object} config
Expand Down Expand Up @@ -52,4 +55,51 @@ function topicPartitionOffsetToRdKafka(tpo) {
};
}

module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka };
/**
* Convert a librdkafka error from node-rdkafka into a KafkaJSError.
* @param {LibrdKafkaError} librdKafkaError to convert from.
* @returns KafkaJSError
*/
function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
const properties = {
retriable: librdKafkaError.retriable,
fatal: librdKafkaError.fatal,
abortable: librdKafkaError.abortable,
stack: librdKafkaError.stack,
code: librdKafkaError.code,
};

let err = null;

if (properties.code === error.ErrorCodes.ERR_OFFSET_OUT_OF_RANGE) {
err = new error.KafkaJSOffsetOutOfRange(e, properties);
} else if (properties.code === error.ErrorCodes.ERR_REQUEST_TIMED_OUT) {
err = new error.KafkaJSRequestTimeoutError(e, properties);
} else if (properties.code === error.ErrorCodes.ERR__PARTIAL) {
err = new error.KafkaJSPartialMessageError(e, properties);
} else if (properties.code === error.ErrorCodes.ERR__AUTHENTICATION) {
err = new error.KafkaJSSASLAuthenticationError(e, properties);
} else if (properties.code === error.ErrorCodes.ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
err = new error.KafkaJSGroupCoordinatorNotAvailableError(e, properties);
} else if (properties.code === error.ErrorCodes.ERR__NOT_IMPLEMENTED) {
err = new error.KafkaJSNotImplemented(e, properties);
} else if (properties.code === error.ErrorCodes.ERR__TIMED_OUT) {
err = new error.KafkaJSTimedOut(e, properties);
} else if (properties.code === error.ErrorCodes.ERR__ALL_BROKERS_DOWN) {
err = new error.KafkaJSNoBrokerAvailableError(e, properties);
} else if (properties.code === error.ErrorCodes.ERR__TRANSPORT) {
err = new error.KafkaJSConnectionError(e, properties);
} else if (properties.code > 0) { /* Indicates a non-local error */
err = new error.KafkaJSProtocolError(e, properties);
} else {
err = new error.KafkaJSError(e, properties);
}

return err;
}

module.exports = {
kafkaJSToRdKafkaConfig,
topicPartitionOffsetToRdKafka,
createKafkaJsErrorFromLibRdKafkaError,
};
187 changes: 187 additions & 0 deletions lib/kafkajs/_error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
const LibrdKafkaError = require('../error');

/**
* @typedef {Object} KafkaJSError represents an error when using the promisified interface.
*/
class KafkaJSError extends Error {
/**
* @param {Error | string} error an Error or a string describing the error.
* @param {object} properties a set of optional error properties.
* @param {boolean} [properties.retriable=false] whether the error is retriable. Applies only to the transactional producer
* @param {boolean} [properties.fatal=false] whether the error is fatal. Applies only to the transactional producer.
* @param {boolean} [properties.abortable=false] whether the error is abortable. Applies only to the transactional producer.
* @param {string} [properties.stack] the stack trace of the error.
* @param {number} [properties.code=LibrdKafkaError.codes.ERR_UNKNOWN] the error code.
*/
constructor(e, { retriable = false, fatal = false, abortable = false, stack = null, code = LibrdKafkaError.codes.ERR_UNKNOWN } = {}) {
super(e, {});
this.name = 'KafkaJSError';
this.message = e.message || e;
this.retriable = retriable;
this.fatal = fatal;
this.abortable = abortable;
this.code = code;

if (stack) {
this.stack = stack;
} else {
Error.captureStackTrace(this, this.constructor);
}

const errTypes = Object
.keys(LibrdKafkaError.codes)
.filter(k => LibrdKafkaError.codes[k] === kjsErr.code);

if (errTypes.length !== 1) {
this.type = LibrdKafkaError.codes.ERR_UNKNOWN;
} else {
this.type = errTypes[0];
}
}
}

/**
* @typedef {Object} KafkaJSProtocolError represents an error that is caused when a Kafka Protocol RPC has an embedded error.
*/
class KafkaJSProtocolError extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSProtocolError';
}
}

/**
* @typedef {Object} KafkaJSOffsetOutOfRange represents the error raised when fetching from an offset out of range.
*/
class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError {
constructor() {
super(...arguments);
this.name = 'KafkaJSOffsetOutOfRange';
}
}

/**
* @typedef {Object} KafkaJSConnectionError represents the error raised when a connection to a broker cannot be established or is broken unexpectedly.
*/
class KafkaJSConnectionError extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSConnectionError';
}
}

/**
* @typedef {Object} KafkaJSRequestTimeoutError represents the error raised on a timeout for one request.
*/
class KafkaJSRequestTimeoutError extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSRequestTimeoutError';
}
}

/**
* @typedef {Object} KafkaJSPartialMessageError represents the error raised when a response does not contain all expected information.
*/
class KafkaJSPartialMessageError extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSPartialMessageError';
}
}

/**
* @typedef {Object} KafkaJSSASLAuthenticationError represents an error raised when authentication fails.
*/
class KafkaJSSASLAuthenticationError extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSSASLAuthenticationError';
}
}

/**
* @typedef {Object} KafkaJSGroupCoordinatorNotFound represents an error raised when the group coordinator is not found.
*/
class KafkaJSGroupCoordinatorNotFound extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSGroupCoordinatorNotFound';
}
}

/**
* @typedef {Object} KafkaJSNotImplemented represents an error raised when a feature is not implemented for this particular client.
*/
class KafkaJSNotImplemented extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSNotImplemented';
}
}

/**
* @typedef {Object} KafkaJSTimeout represents an error raised when a timeout for an operation occurs (including retries).
*/
class KafkaJSTimeout extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSTimeout';
}
}

/**
* @typedef {Object} KafkaJSAggregateError represents an error raised when multiple errors occur at once.
*/
class KafkaJSAggregateError extends Error {
constructor(message, errors) {
super(message);
this.errors = errors;
this.name = 'KafkaJSAggregateError';
}
}

/**
* @typedef {Object} KafkaJSNoBrokerAvailableError represents an error raised when no broker is available for the operation.
*/
class KafkaJSNoBrokerAvailableError extends KafkaJSError {
constructor() {
super(...arguments);
this.name = 'KafkaJSNoBrokerAvailableError';
}
}

/**
* @function isRebalancing
* @param {KafkaJSError} e
* @returns boolean representing whether the error is a rebalancing error.
*/
const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' ||
e.type === 'NOT_COORDINATOR_FOR_GROUP' ||
e.type === 'ILLEGAL_GENERATION';

/**
* @function isKafkaJSError
* @param {any} e
* @returns boolean representing whether the error is a KafkaJSError.
*/
const isKafkaJSError = e => e instanceof KafkaJSError;

module.exports = {
KafkaJSError,
KafkaJSPartialMessageError,
KafkaJSProtocolError,
KafkaJSConnectionError,
KafkaJSRequestTimeoutError,
KafkaJSSASLAuthenticationError,
KafkaJSOffsetOutOfRange,
KafkaJSGroupCoordinatorNotFound,
KafkaJSNotImplemented,
KafkaJSTimeout,
KafkaJSAggregateError,
KafkaJSNoBrokerAvailableError,
isRebalancing,
isKafkaJSError,
ErrorCodes: LibrdKafkaError.codes,
};
7 changes: 4 additions & 3 deletions lib/kafkajs/_kafka.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { Producer } = require("./_producer");
const { Consumer } = require("./_consumer");
const { Producer } = require('./_producer');
const { Consumer } = require('./_consumer');
const error = require('./_error');

class Kafka {
#commonClientConfig = {};
Expand Down Expand Up @@ -32,4 +33,4 @@ class Kafka {
}
}

module.exports = { Kafka };
module.exports = { Kafka, ...error };