Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add diagnostic events for logging visibility #559

Merged
merged 9 commits into from
Jun 28, 2019
Merged

Add diagnostic events for logging visibility #559

merged 9 commits into from
Jun 28, 2019

Conversation

micah-jaffe
Copy link
Contributor

@micah-jaffe micah-jaffe commented Jun 21, 2019

Added logging into the executor service for visibility and to help debug complex thread-related issues. Also added an error handler for rejected task exceptions in the RxJava layer to propagate those exceptions up to the KCL layer and log them accordingly.

Per discussion with team we are not providing the flexibility to customize logging or add additional handlers as of now. As a result all new classes in this PR are package-private access.

Example of regular executor state diagnostic:

16:20:31.746 [Thread-1] INFO  s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)

Example of rejected task logging:

2019-06-26 16:18:35,044 [main] ERROR s.a.k.c.DiagnosticEventLogger [NONE] - Review your thread configuration to prevent task rejections. Until next release, KCL will not be resilient to task rejections. Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=0, maximumPoolSize=2147483647) 
io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Test exception.
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at software.amazon.kinesis.coordinator.SchedulerTest.lambda$testErrorHandlerForUndeliverableAsyncTaskExceptions$1(SchedulerTest.java:286)
	at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:34)
	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:91)
	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:38)
	at org.mockito.internal.creation.cglib.MethodInterceptorFilter.intercept(MethodInterceptorFilter.java:59)
	at software.amazon.kinesis.coordinator.Scheduler$$EnhancerByMockitoWithCGLIB$$1999942.runProcessLoop(<generated>)
	at software.amazon.kinesis.coordinator.SchedulerTest.testErrorHandlerForUndeliverableAsyncTaskExceptions(SchedulerTest.java:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.mockito.internal.runners.JUnit45AndHigherRunnerImpl.run(JUnit45AndHigherRunnerImpl.java:37)
	at org.mockito.runners.MockitoJUnitRunner.run(MockitoJUnitRunner.java:62)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.util.concurrent.RejectedExecutionException: Test exception.
	... 33 common frames omitted

Example logs emitted every 30 seconds from running KCL application:

2019-07-01 17:00:17,929 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:00:47,950 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:01:17,955 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:01:47,967 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:02:17,972 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:02:47,977 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:03:17,981 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 
2019-07-01 17:03:47,986 [KinesisTester-0000] INFO  s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=25, largestPoolSize=51, maximumPoolSize=2147483647) 

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link
Contributor

@ashwing ashwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial feedback. Yet to review unit test cases.

Copy link
Contributor

@sahilpalvia sahilpalvia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments. One more comment, this could do without a new java package called logging.

@@ -167,6 +170,9 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventHandler = new DefaultDiagnosticEventHandler();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could come from the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we don't want the customer to modify this handler, should this still go in the config?

* @param handler This method accepts a handler param solely for testing. All non-test calls to this method will
* have a null input.
*/
@VisibleForTesting

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be tested as part of runProcessLoop() method?
We want to make sure this is invoked from runProcessLoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the unit test for this method to call RxJavaPlugins.onError() in runProcessLoop() on a scheduler spy to verify that the error handler works as expected

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get the scheduler take an EventFactory, then we can mock the event vended to the scheduler, and assert that .accept is invoked on the event upon failure.

That way, this method can be marked private, and no need to support custom handler as an argument to this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to mock the EventFactory for testability, we would need a public interface for setting/modifying the EventFactory (either in configs or a setter). Currently we don't want users to be able to interact with this factory so this would only solve the problem if we introduced a 'backdoor' non-public setter for testing, which still isn't really designing for testability. Unless there's another way to inject a mock that doesn't rely on a public gateway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use a factory by overloading with a protected constructor for testing

Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Micah. LGTM, mostly code organization related feedback.

Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

followup up on a unittest comment.

Copy link
Contributor

@sahilpalvia sahilpalvia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good mostly, just a couple of minor comments. Nothing blocking.

@Slf4j
@KinesisClientInternalApi
class ExecutorStateEvent implements DiagnosticEvent {
private final String MESSAGE = "Current thread pool executor state: ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker: Seems like this is only used in one place, so doesn't need to be a class variable.

public String message() {
    return String.format("Current threadpool executor state: %s", toString());
}

@micah-jaffe micah-jaffe added this to the v2.2.1 milestone Jun 27, 2019
Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@sahilpalvia sahilpalvia merged commit fa72cf1 into awslabs:master Jun 28, 2019
@micah-jaffe micah-jaffe deleted the add-logging branch June 28, 2019 19:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants