Hi there,
Our team has recently written the following code for ensuring that the PutRecordsRequest we send to the AmazonKinesisClient does not exceed the maximum entry limit of 500.
Is there any reason why this kind of helper method isn't included in the SDK already? It would mean one less thing to worry about when putting the records onto the kinesis stream :)
private static final int KINESIS_MAX_BATCH_SIZE = 500;
private void putRecords( List<PutRecordsRequestEntry> putRecordsRequestEntries,
String streamName )
{
if ( !putRecordsRequestEntries.isEmpty() )
{
int size = putRecordsRequestEntries.size();
int fullChunks = ( size - 1 ) / KINESIS_MAX_BATCH_SIZE;
Stream<List<PutRecordsRequestEntry>> requestEntriesStream = IntStream.range( 0, fullChunks + 1 )
.mapToObj( n -> putRecordsRequestEntries.subList( n * KINESIS_MAX_BATCH_SIZE,
n == fullChunks ? size : ( n + 1 ) * KINESIS_MAX_BATCH_SIZE ) );
requestEntriesStream.forEach( recordsEntryList ->
{
putRecords( new PutRecordsRequest().withRecords( recordsEntryList ).withStreamName( streamName ) );
} );
}
}
Thanks,
Lewis
Hi there,
Our team has recently written the following code for ensuring that the
PutRecordsRequestwe send to theAmazonKinesisClientdoes not exceed the maximum entry limit of 500.Is there any reason why this kind of helper method isn't included in the SDK already? It would mean one less thing to worry about when putting the records onto the kinesis stream :)
Thanks,
Lewis