Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding testing architecture and KCL 2.x basic polling/streaming tests #1136

Merged
merged 18 commits into from
Jun 21, 2023

Conversation

mmankika-aws
Copy link
Contributor

Issue #, if available:

Description of changes:
Adding testing architecture to implement automated integration tests.
Added KCL2XIntegrationTests which implements basic polling and streaming consumer tests against a single producer.
Integration tests require AWS credentials to be provided. The integration tests can be run with the command
mvn -Dit.test=KCLV2IntegrationTest -Dcredentials="<IAM_USER>" verify

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link
Contributor

@stair-aws stair-aws left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review; need to divert to other work. To expedite the review process, please apply any learnings/comments to all code (which may not have been reviewed yet).


public StreamExistenceManager(KCLAppConfig config) throws URISyntaxException, IOException {
this.testConfig = config;
this.client = config.buildAsyncKinesisClient(Protocol.HTTP1_1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ignores any protocol specified by the KCLAppConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This client is for the DescribeStreams API call. For this, it does not matter which HTTP protocol is being used so a default is put here. When making PutRecord, GetRecord calls, the Consumer Protocol specified in KCLAppConfig is used. However, I will update this so that they are consistent.

@stair-aws stair-aws added the v2.x Issues related to the 2.x version label Jun 13, 2023
amazon-kinesis-client/pom.xml Outdated Show resolved Hide resolved
Instant instant = d.atZone( ZoneId.systemDefault() ).toInstant();
Date startStreamTime = Date.from( instant );

InitialPositionInStreamExtended initialPosition = InitialPositionInStreamExtended
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FMI why we chose to start at (current time -5 min)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to start processing records that were put onto the stream in this test case. This is just a small value that estimates approximately when the test case may have started (related to the time that the consumer has finished getting started and is now processing records).

Copy link
Contributor

@stair-aws stair-aws left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review.

public class BasicStreamingPollingIntegrationTest {

@Test
public void KCLReleaseCanaryPollingH2Test() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method name should always start with small letters, also better naming convention for tests is "MethodName_StateUnderTest_ExpectedBehavior" ref https://medium.com/@stefanovskyi/unit-test-naming-conventions-dd9208eadbea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also better naming convention for tests is "MethodName_StateUnderTest_ExpectedBehavior" ref https://medium.com/@stefanovskyi/unit-test-naming-conventions-dd9208eadbea

The above statement is subjective.

Fact: Java method naming convention is lowerCamelCase.

From Javadoc wiki:

Writing comments and Javadoc is for better understanding the code and thus better maintaining it.


Opinion 1: Natural language (e.g., English) is much better at communicating intent than anAbridgedCamelCaseMashup, regardless of whether underscores are used as delimiters.
Opinion 2: The proposed method naming suggestion feels like a manifestation of Law of the instrument. (e.g., "If the only tool you have is a hammer, it is tempting to treat everything as if it were a nail")

Suggestion: if the test and intent cannot be made evident w/ a conventional method name, add Javadoc. Document the intent, preconditions, postconditions, and anything else that a future maintainer might find useful or informative. Natural language does not have compile-time constraints, and provides a much broader palette on which the author may convey details.


public class RecordValidatorQueueTest {

private RecordValidatorQueue recordValidator = new RecordValidatorQueue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

// Setup configuration of KCL (including DynamoDB and CloudWatch)
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestRecordProcessorFactory(recordValidator));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigBuild creation logic is in KCLAppConfig already, re use that and avoid re-creating the same clients again. The clients like ddb kcl are created with credentials already, no need to create them again.


RecordValidatorQueue recordValidator;

public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives a compile error

Comment on lines 89 to 93
public final KinesisAsyncClient buildAsyncKinesisClient(Protocol protocol) throws URISyntaxException, IOException {
if (kinesisAsyncClient == null) {
this.kinesisAsyncClient = buildAsyncKinesisClient(Optional.ofNullable(protocol));
}
return this.kinesisAsyncClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicate method, remove this.


private String shardId;

RecordValidatorQueue recordValidator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final

try {
log.info("Processing {} record(s)", processRecordsInput.records().size());

for (KinesisClientRecord r : processRecordsInput.records()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np r -> kinesisRecord or similar. Dont use single alphabet words

log.info("Processing {} record(s)", processRecordsInput.records().size());

for (KinesisClientRecord r : processRecordsInput.records()) {
String data = new String(asByteArray(r.data()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

int nextVal = Integer.parseInt(record);
if (prevVal > nextVal) {
log.error("The records are not in increasing order. Saw record data {} before {}.", prevVal, nextVal);
shardIncOrder = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this point, no need to check the next records of this shard or even other shards, test is going to fail anyways, Fail here directly and go back

public RecordValidationStatus validateRecords(int expectedShardCount) {

// Validate that each List in the HashMap has data records in increasing order
boolean incOrder = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used anymore, remove

}

// If this is true, then there was some record that was missed during processing.
if (actualShardCount != expectedShardCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not shardCount, these are record counts. Rename the variables

validateRecordProcessor();

// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this delay added for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is sometimes a delay in the workers getting shutdown and deleted. This delay ensures that that process is complete before we try to delete any resources.


// Clean up resources created
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
deleteResources(streamExistenceManager, leaseTableManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move deleteResources to finally block, this is something you need to do in both try and catch block right

} catch (InterruptedException e) {
log.info("Interrupted, assuming shutdown.");
} catch (ExecutionException | RuntimeException e) {
log.error("Error during publish records");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log exception as well

Comment on lines 184 to 185
log.error("Error creating payload data for {}", payloadCounter.toString());
throw new RuntimeException("Error converting object to bytes: ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip logging of the error, you are throwing it so this should be logged once at top level

} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) {
throw new Exception("Exception while executing graceful shutdown. {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont throw Exception of class Exception. The captured exception already has enough info just throw that same

Comment on lines 61 to 66
final ListTablesRequest request = ListTablesRequest.builder().build();
ListTablesResponse response = null;
while(response == null || response.lastEvaluatedTableName() != null) {
try {
response = FutureUtils.resolveOrCancelFuture(dynamoClient.listTables(request), Duration.ofSeconds(60));
} catch (ExecutionException | InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you make a paginated call again, the request has to include the token from last call to make progress. Else this will keep on just getting same results with same request.

Also try to abstract the common code for this method as well and just keep API request creation and API call here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing the request that is made. With this new change there are many places in this method that are resource dependent (the initialization at the top, while condition, try block, addAll call...) that would need to be implemented in the resource specific managers that I think that abstracting this would harm readability more than it would help.

@Slf4j
public abstract class AWSResourceManager {

public AWSResourceManager() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use lombok for this.

Comment on lines 28 to 29
public LeaseTableManager(DynamoDbAsyncClient dynamoClient) throws URISyntaxException, IOException {
this.dynamoClient = dynamoClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use lombok's @AllArgConstructor

abhit17
abhit17 previously approved these changes Jun 21, 2023
@stair-aws stair-aws dismissed their stale review June 21, 2023 20:27

Reviewed by abhit17@

@mmankika-aws mmankika-aws merged commit 53dbb4e into awslabs:master Jun 21, 2023
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
v2.x Issues related to the 2.x version
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants