Skip to content

Commit

Permalink
[PIP-156][fix][ci] fixed KinesisSinkTest record format error by deagg…
Browse files Browse the repository at this point in the history
…regation
  • Loading branch information
heesung-sn committed Apr 26, 2022
1 parent a0ae79a commit c17f0bf
Showing 1 changed file with 47 additions and 24 deletions.
Expand Up @@ -21,6 +21,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
Expand All @@ -46,7 +48,10 @@
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
Expand Down Expand Up @@ -189,38 +194,56 @@ private void internalValidateSinkResult(Map<String, String> kvs) {

Map<String, String> actualKvs = new LinkedHashMap<>();

// millisBehindLatest equals zero when record processing is caught up,
// and there are no new records to process at this moment.
// See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
Awaitility.await().until(() -> addMoreRecordsAndGetMillisBehindLatest(actualKvs, iterator) == 0);
addMoreRecords(actualKvs, iterator);

assertEquals(actualKvs, kvs);
}

@SneakyThrows
private Long addMoreRecordsAndGetMillisBehindLatest(Map<String, String> kvs, String iterator) {
final GetRecordsResponse response = client.getRecords(
GetRecordsRequest
.builder()
.shardIterator(iterator)
.build())
.get();
if(response.hasRecords()) {
for (Record record : response.records()) {
String data = record.data().asString(StandardCharsets.UTF_8);
if (withSchema) {
JsonNode payload = READER.readTree(data).at("/payload");
String i = payload.at("/value/field1").asText();
assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
kvs.put(i, i);
} else {
kvs.put(record.partitionKey(), data);
private void parseRecordData(Map<String, String> actualKvs, String data, String partitionKey) {
if (withSchema) {
JsonNode payload = READER.readTree(data).at("/payload");
String i = payload.at("/value/field1").asText();
assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
actualKvs.put(i, i);
} else {
actualKvs.put(partitionKey, data);
}
}

@SneakyThrows
private void addMoreRecords(Map<String, String> actualKvs, String iterator) {
GetRecordsResponse response;
List<KinesisClientRecord> aggRecords = new ArrayList<>();
do {
GetRecordsRequest request = GetRecordsRequest.builder().shardIterator(iterator).build();
response = client.getRecords(request).get();
if (response.hasRecords()) {
for (Record record : response.records()) {
// KinesisSink uses KPL with aggregation enabled (by default).
// However, due to the async state initialization of the KPL internal ShardMap,
// the first sinked records might not be aggregated in Kinesis.
// ref: https://github.com/awslabs/amazon-kinesis-producer/issues/131
try {
String data = record.data().asString(StandardCharsets.UTF_8);
parseRecordData(actualKvs, data, record.partitionKey());
} catch (UncheckedIOException e) {
aggRecords.add(KinesisClientRecord.fromRecord(record));
}
}
}
iterator = response.nextShardIterator();
// millisBehindLatest equals zero when record processing is caught up,
// and there are no new records to process at this moment.
// See https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-response-MillisBehindLatest
} while (response.millisBehindLatest() != 0);

for (KinesisClientRecord record : new AggregatorUtil().deaggregate(aggRecords)) {
String data = new String(record.data().array(), StandardCharsets.UTF_8);
parseRecordData(actualKvs, data, record.partitionKey());
}
return response.millisBehindLatest();
}

@Data
Expand Down

0 comments on commit c17f0bf

Please sign in to comment.