APEXCORE-505 Heartbeat loop was blocked waiting for operator activati…#405
APEXCORE-505 Heartbeat loop was blocked waiting for operator activati…#405asfgit merged 1 commit intoapache:masterfrom
Conversation
7611319 to
afb1e79
Compare
vrozov
left a comment
There was a problem hiding this comment.
Please add a unit test that checks that it is OK to activate BufferServerSubscriber before an operator that consumes from the reservoir.
afb1e79 to
0b3bb27
Compare
|
@vrozov I have added the unit test. Please review. |
|
Please avoid code duplication. |
|
It is necessary to test operator activation sequence. |
|
|
||
| @Test | ||
| public void testBufferServerSubscriberActivationBeforeOperator() throws InterruptedException, IOException | ||
| { |
There was a problem hiding this comment.
delay this declaration till it can be initialized
| @Test | ||
| public void testBufferServerSubscriberActivationBeforeOperator() throws InterruptedException, IOException | ||
| { | ||
| int bufferServerPort = 0; |
There was a problem hiding this comment.
delay declaration till it can be initialized
|
|
||
| ((DefaultEventLoop)eventloop).start(); | ||
| bufferServer = new Server(0); // find random port | ||
| InetSocketAddress bindAddr = bufferServer.run(eventloop); |
There was a problem hiding this comment.
final int bufferServerPort = bufferServer.run(eventloop).getPort();
| final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>(); | ||
| final ArrayList<Object> list = new ArrayList<Object>(); | ||
|
|
||
| GenericOperator go = new GenericOperator(); |
There was a problem hiding this comment.
Please check if OperatorContext is required or it can be null
There was a problem hiding this comment.
The context cannot be null, in the Node activation values are read from context.
| issContext = new StreamContext(streamName); | ||
| issContext.setSourceId(upstreamNodeId); | ||
| issContext.setSinkId(downstreamNodeId); | ||
| issContext.setFinishedWindowId(-1); |
There was a problem hiding this comment.
Done. Using "localhost" to create the socket address.
| oss.put(beginWindow2); | ||
| oss.put(endWindow2); | ||
| oss.put(beginWindow3); | ||
| oss.put(endWindow3); |
There was a problem hiding this comment.
The purpose of the test is to check that BufferServerSubscriber can be activated and process tuples without a loss even if it is activated before the operator is setup and starts processing tuples. There is no guarantee that Publisher actually delivers tuples to the buffer server and buffer server sends them to the Subscriber in the test.
1e673f3 to
e50f090
Compare
| SweepableReservoir tupleWait = iss.acquireReservoir("testReservoir2", 10); | ||
|
|
||
| iss.activate(issContext); | ||
|
|
There was a problem hiding this comment.
Should the condition be reversed?
There was a problem hiding this comment.
Yes, will change it.
…on, reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for operator activation in heartbeat thread. After analysis and sanity testing, we don't see the need to have the synchronization between operator and stream activation
e50f090 to
fc3246e
Compare
…on, the reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for operator activation in a heartbeat thread. There is no need to have this synchronization, as Tuples are pulled from the queues by the operators.
@vrozov @tweise please review