Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.component.kafka.consumer.errorhandler;

import org.apache.camel.component.kafka.KafkaFetchRecords;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.kafka.common.errors.AuthenticationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,11 +44,19 @@ public boolean canContinue() {

@Override
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
if (exception instanceof AuthenticationException) {
LOG.warn("Kafka reported a non-recoverable authentication error. The client will not reconnect");

// disable reconnect: authentication errors are non-recoverable
recordFetcher.setReconnect(false);
recordFetcher.setConnected(false);
} else {
LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");

// re-connect so the consumer can try the same message again
recordFetcher.setReconnect(true);
recordFetcher.setConnected(false);
// re-connect so the consumer can try the same message again
recordFetcher.setReconnect(true);
recordFetcher.setConnected(false);
}

// to close the current consumer
retry = false;
Expand Down