Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -461,17 +461,6 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (isAtLeastOnceProcessing()
&& committedOffset != null
&& committedOffset.offset() > record.offset()
&& commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
// Ensures that after a topology with this id is started, the consumer fetch
// position never falls behind the committed offset (STORM-2844)
throw new IllegalStateException("Attempting to emit a message that has already been committed."
+ " This should never occur when using the at-least-once processing guarantee.");
}

final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
if (isEmitTuple(tuple)) {
final boolean isScheduled = retryService.isScheduled(msgId);
Expand Down