diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index d698ecf2e9d00..6de72783f1fb5 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -278,8 +278,8 @@ private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recor double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos; double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz; maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); - // Ensure the value is not more than 10000L - maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); + // Ensure the value is greater than 0 and not more than 10000L + maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)); } return maxNumberOfRecordsPerFetch; }