Skip to content

Commit

Permalink
reflect consumer failure in connection status
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 16, 2021
1 parent 8d3f8b6 commit c2bf3b1
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityInternalErrorException;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.Enforcement;
Expand Down Expand Up @@ -256,19 +255,27 @@ private void start() throws IllegalStateException {

private void handleStreamCompletion(@Nullable final Done done, @Nullable final Throwable throwable) {
final ConnectivityStatus status;
final ResourceStatus statusUpdate;
final Instant now = Instant.now();
if (null == throwable) {
status = ConnectivityStatus.CLOSED;
statusUpdate = ConnectivityModelFactory.newStatusUpdate(
InstanceIdentifierSupplier.getInstance().get(),
status,
sourceAddress,
"Consumer closed", now);
} else {
log.debug("Consumer failed with error! <{}: {}>", throwable.getClass().getSimpleName(),
throwable.getMessage());
status = connectivityStatusResolver.resolve(throwable);
escalate(throwable, "Unexpected consumer failure.");
statusUpdate = ConnectivityModelFactory.newStatusUpdate(
InstanceIdentifierSupplier.getInstance().get(),
status,
sourceAddress,
ConnectionFailure.determineFailureDescription(now, throwable,
"Kafka consumer failed."), now);
}
final ResourceStatus statusUpdate = ConnectivityModelFactory.newStatusUpdate(
InstanceIdentifierSupplier.getInstance().get(),
status,
sourceAddress,
"Consumer closed", Instant.now());
handleAddressStatus(statusUpdate);
}

Expand Down

0 comments on commit c2bf3b1

Please sign in to comment.