Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to shutdown PrefetchRecordsPublisher in gracefull manner #857

Merged

Conversation

madagascar22
Copy link
Contributor

Description of changes:

Previously when the lease expires PrefetchRecordsPublisher shutdown
the process forecefully by interupting the threads,
which lead to leak in apache http client connection
Now changed to code to shutdown the PrefetchRecordsPublisher
process in more gracefull manager and handled interrupted exception

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Monishkumar Gajendran added 2 commits October 5, 2021 09:50
Previously when the lease expires PrefetchRecordsPublisher shutdown
the process forecefully by interupting the threads,
which lead to leak in apache http client connection
Now changed to code to shutdown the PrefetchRecordsPublisher
process in more gracefull manager and handled interrupted exception
@@ -78,6 +78,9 @@
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
// Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds
private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we get this value? Was there tests run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this as upper threshold value for a prefetcher to end gracefully or else we will forcefully terminate.

…isher

Since clients can configure there own awaitTerminationTimeoutMillis,
add it as sepearate parameter with default value
}
} catch (InterruptedException e) {
// Preserve interrupt status
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we set the interrupt flag after shutting down the thread-pool in the following line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, we need to shutdown and then set the interrupt or else shutdown will fail. I will update the same.

try {
resetLock.readLock().lock();
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
} catch (Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we wrote this exception handling this way originally, but I feel like making this layer InterruptedException and making the final layer Throwable would be a bit more elegant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are going to log the error message regardless of the InterruptedException and Throwable. Its good to handle InterruptedException inside the Throwable catch block or else we may simply create a separate function for log message.

Copy link
Contributor

@joshua-kim joshua-kim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left minor comments

@joshua-kim
Copy link
Contributor

LGTM. Build is failing for flaky reasons again. Approving.

@joshua-kim joshua-kim merged commit 6fd59b3 into awslabs:master Oct 7, 2021
@shanmsac shanmsac added this to the v2.3.7 milestone Oct 11, 2021
@hamid646m
Copy link

hi, side effect of this change is, KCL is throwing this error:

java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:464)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

@madagascar22
Copy link
Contributor Author

This is expected graceful shutdown scenario because this fix allows to make the intentional shutdown decision instead of interrupting the thread using shutdownNow().

@hamid646m
Copy link

hamid646m commented Oct 19, 2021

if this is expected behaviour why is it logging an error and throwing exception? surely it should let the worker drain the queue too? perhaps it should look at isTerminated instead of isShutdown ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants