Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3703: Graceful close for consumers and producer with acks=0 #1836

Closed
wants to merge 11 commits into from

Conversation

rajinisivaram
Copy link
Contributor

Process requests received from channels before they were closed. For consumers, wait for coordinator requests to complete before returning from close.

@rajinisivaram
Copy link
Contributor Author

@hachikuji @ijuma The old PR was not useful for producers and it wasn't fully fixing consumers either. So here is a new PR. The changes are:

  1. Consumers now wait for pending coordinator requests to complete. So this should take care of commits and leave-group.
  2. Broker now handles completed receives from channels when a channel is closed. With producer acks=0 when requests are immediately followed by close + exit, this change processes the last messages received instead of dropping them.

For producers with acks=0, it may still be possible to lose data if the producer exits immediately after sending a lot of messages because SO_LINGER doesn't linger in non-blocking mode. But I haven't been able to recreate the problem with this PR.

@@ -406,6 +406,10 @@ public void close() {
} finally {
super.close();
}
Node coordinator;
while ((coordinator = coordinator()) != null && client.pendingRequestCount(coordinator) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could use client.awaitPendingRequests() to achieve the same thing. I'm wondering if we should have a hard-coded timeout. In the worst case, we'll only block as long as the request timeout, but since both the OffsetCommit and LeaveGroup are "best effort" and should be handled relatively quickly, perhaps it would be better to timeout sooner. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thank you for the review. Yes, I was in two minds about adding a hard-coded timeout. But it does make sense to have a timeout, have added a timeout of 5 seconds.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Sorry for the delay. Left a couple minor comments, but this mostly looks good to me. The change seems straightforward, but it might be good to get another look from @ijuma or @junrao.

@@ -403,6 +405,16 @@ public void close() {
client.disableWakeups();
try {
maybeAutoCommitOffsetsSync();

Node coordinator;
long endTimeMs = System.currentTimeMillis() + CLOSE_TIMEOUT_MS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use time.milliseconds(). Same for the other call in the loop below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Had to also clear outstanding requests from a few consumer tests where mock timer without auto-tick is used.

if (remainingTimeMs > 0)
client.poll(remainingTimeMs);
else
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be helpful to log a warning in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

while (!deque.isEmpty()) {
addToCompletedReceives(channel, deque);
}
closedChannels.put(channel.id(), channel);
Copy link
Contributor

@hachikuji hachikuji Oct 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably worth a comment somewhere explaining why we bother to keep track of closed channels (especially in regard to the acks=0 case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@rajinisivaram
Copy link
Contributor Author

@hachikuji Thank you for the review. Have addressed your comments and rebased to the latest trunk level. @ijuma Can you take a look as well please? Thank you.

@ijuma
Copy link
Contributor

ijuma commented Oct 24, 2016

@rajinisivaram, will take a look tomorrow. Sorry for the delay.

@@ -831,6 +831,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() {
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());

client.requests().clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we need the clear call to avoid having to wait until the close timeout? Do we have any test cases that trigger the new code path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thank you for the review. Yes, clear is required to avoid close() waiting for the timeout. Since these tests use MockTime, the tests would otherwise need to be updated to change time during close(). With this change, the timeout code path was no longer being triggered in unit tests, so I have added a couple of tests.

@@ -486,12 +487,14 @@ private[kafka] class Processor(val id: Int,
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
val openChannel = selector.channel(receive.source)
val channel = if (openChannel != null) openChannel else selector.closedChannel(receive.source)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this may be a little better like:

val socketAddress = {
  val channel = if (openChannel != null) openChannel else selector.closedChannel(receive.source)
  channel.socketAddress
}

The reason is that some methods are unsafe to call on a closed channel. This way we make it clear that we are only calling the safe socketAddress on a potentially closed channel. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just realised that we also need the principal from the channel.

By the way, we had the following issue reported a while back: https://issues.apache.org/jira/browse/KAFKA-3986, but I could not figure out under which cases we would end up with a null channel here. If you have, I'd be interested to know. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thank you, I have updated the code. I had seen that JIRA, but couldn't figure out the scenario either.

val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
if (openChannel != null)
selector.mute(openChannel.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check was necessary to avoid muting closed channels, but the other change was unnecessary, I have reverted. Thanks.

@rajinisivaram
Copy link
Contributor Author

@hachikuji @ijuma Is any more work required on this PR? Thank you...

@hachikuji
Copy link
Contributor

@rajinisivaram Not from me. I was going to let Ismael take a final look before committing. I think he's back next week.

@rajinisivaram
Copy link
Contributor Author

@hachikuji Thank you...

@ijuma
Copy link
Contributor

ijuma commented Nov 15, 2016

I am back, but still catching up. Have this near the top of my review pile, so should get to it today or tomorrow.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally got around to this, left some simple suggestions. Most of them are comments, but shutting down the executor in one of the tests is important.

client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
consumer.poll(0);

// Initiate close() after a commit request on another thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer is meant to be used from a single thread, and it throws an exception if accessed from multiple threads simultaneously. Because of the acquire/release pair, it's probably safe to call it from a different thread given the current implementation, but probably worth adding a comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

consumer.poll(0);

// Initiate close() after a commit request on another thread
ExecutorService executor = Executors.newSingleThreadExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to shutdown the executor, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

channel.socketAddress)
val openChannel = selector.channel(receive.source)
val session = {
val channel = if (openChannel != null) openChannel else selector.closedChannel(receive.source)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth adding a comment noting that we should only call methods that are safe to call on a closed channel (or something to that effect).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// may be processed. For example, when producer with acks=0 sends some records and
// closes its connections, a single poll() in the broker may receive records and
// handle close(). Closed channels are retained until the current poll() processing
// completes to enable all records received on the channel to be processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth saying something like: we deviate from the norm that at most a single completed receive is returned per channel in each poll invocation for this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated the code, see note below.

Deque<NetworkReceive> deque = this.stagedReceives.remove(channel);
if (deque != null) {
while (!deque.isEmpty()) {
addToCompletedReceives(channel, deque);
Copy link
Contributor

@junrao junrao Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure about this code. Currently, each selector.poll() is guaranteed to only return at most 1 receive for a given channel. This is important to guarantee that requests from the same channel are processed on the broker in the order they are sent. With this change, SocketServer can potentially put more than one outstanding request from a channel in the request queue. Since the requests could be handled by different request handler threads, there is no guarantee in which order those requests will be processed, which will break our message ordering semantic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It would be good to expand the following text in poll to explain why we only ever return one completedReceive per channel. It is something that has come up a few times.

To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
     * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
     * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
     * and pop response and add to the completedReceives."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, had completely missed that. I have updated the code to handle one receive at a time. Added comment and tests.

@@ -64,6 +64,8 @@

private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);

private static final long CLOSE_TIMEOUT_MS = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hard code this or add a new method close(long timeout, TimeUnit timeUnit) like the producer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean adding one such method to KafkaConsumer as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That will require a KIP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will raise a KIP to add a close with timeout for consistency with the producer. Can we do that in a follow-on JIRA since this is already quite a big change? The hard-coded limit can be temporary until a close() with timeout is implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, doing this in a follow-up jira is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that a follow-up is fine.

@rajinisivaram
Copy link
Contributor Author

@ijuma @junrao Thank you for the reviews. Can you take a look at the updated code? Thank you!

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram : Thanks for the patch. LGTM. Just a few minor comments.

return channel;
}

public void outputChannel(WritableByteChannel channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment on when this should be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (!disconnected)
return transportLayer.isMute();
else
return muted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the if/else? Could we just return muted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -529,6 +564,13 @@ public KafkaChannel channel(String id) {
}

/**
* Return the channel with the specified id if it was closed during the last poll().
*/
public KafkaChannel closedChannel(String id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in the Selectable interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are currently no references to KafkaChannel in Selectable and channel() is not included in the interface. So I haven't added this.

@@ -40,7 +41,7 @@
private final List<SocketChannel> socketChannels;
private final AcceptorThread acceptorThread;
private final Selector selector;
private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
private WritableByteChannel outputChannel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rajinisivaram, I think we're close in this tricky PR. :)

I suggest you start a system tests run once you've reviewed the comments from myself and Jun and addressed the ones you think should be addressed. That will increase our confidence.

@@ -64,6 +64,8 @@

private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);

private static final long CLOSE_TIMEOUT_MS = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that a follow-up is fine.

@@ -447,7 +447,7 @@ public int read(ByteBuffer dst) throws IOException {
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
if (netReadBuffer.remaining() > 0) {
int netread = socketChannel.read(netReadBuffer);
if (netread == 0 && netReadBuffer.position() == 0) return netread;
if (netread == 0 && netReadBuffer.position() == 0) return read;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the tests in SslTransportLayerTest verify the changes in this file? That is, would there be test failures without the changes in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change above that returns the correct number of bytes read is never actually used in Kafka. The return value is just there to implement the interface. Since I was in the code, I thought it was worth fixing anyway (all the other return statements do return read correctly). The other change is required, and the new close test does verify that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's clearer to just return 0?

selector.send(new NetworkSend(node, ByteBuffer.wrap(message)));
do {
selector.poll(0L);
} while (selector.completedSends().size() == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: selector.completedSends().isEmpty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private KafkaChannel channel(String id) {
KafkaChannel channel = selector.channel(id);
if (channel == null)
channel = selector.closedChannel(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: seems like we can just return instead of mutating the channel variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -509,7 +542,9 @@ public boolean isChannelReady(String id) {
private KafkaChannel channelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel == null)
throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());
channel = this.closedChannels.get(id);
if (channel == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to do this given that this method is called from send? I think I'd prefer to name this in a way that was obvious that it can return open or closed channels (unlike channel).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send failures on closingChannels are handled as ungraceful close, just as with normal channels, so the code is safe. But I agree it could be made more obvious. I have changed the method to take a flag and added a separate code path in send to make handling of closing channels more obvious.

if (deque.isEmpty())
this.stagedReceives.remove(channel);
}
closedChannels.put(channel.id(), channel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like these channels are not actually closed. Maybe they should be called closingChannels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, done.

channel.socketAddress)
val openChannel = selector.channel(receive.source)
val session = {
// Only methods that are safe to call on a closed channel should be invoked on 'channel'.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe closed should be replaced by disconnected since the channel is not really closed (if I have read the code correctly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@rajinisivaram
Copy link
Contributor Author

@ijuma @junrao Thank you for the reviews. Have addressed the comments. I have scheduled a system test run, will add a link once it starts running.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, left a couple of comments.

@@ -506,10 +544,12 @@ public boolean isChannelReady(String id) {
return channel != null && channel.ready();
}

private KafkaChannel channelOrFail(String id) {
private KafkaChannel channelOrFail(String id, boolean maybeClosing) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boolean parameter is not being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, fixed now.

this.channels.remove(channel.id());
this.sensors.connectionClosed.record();

if (idleExpiryManager != null)
idleExpiryManager.remove(channel.id());
}

private void doClose(KafkaChannel channel) {
Copy link
Contributor

@ijuma ijuma Nov 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a slight difference in behaviour for people who close a channel directly. Before this PR, the channel would not appear in disconnected after a poll where now it will. Have you verified that this doesn't cause any issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is currently code that handles close() and disconnect in differently since close() did not add channels to disconnected. Working on a fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Have updated the code to retain the current semantics for disconnected. When close() is called locally, an immediate close is performed without adding to disconnected list since callers expect close to be complete and handle disconnections on return. In other cases, the channel is added to disconnected when the channel is actually closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rajinisivaram, this seems to make sense to me.

String connectionId = send.destination();
KafkaChannel closingChannel = closingChannels.remove(connectionId);
if (closingChannel != null)
doClose(closingChannel, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I wonder if we should change private void close(KafkaChannel channel, boolean processOutstanding) to take an additional notifyDisconnect parameter and then have an overload that defaults notifyDisconnect to be the same as processOutstanding. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with a close with three parameters exactly as you suggest. And then changed it because I thought it looked a bit confusing. The overload does make it clearer, so I have updated the code. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I suggested that is that I thought we could then simplify the code here. I thought we could call close(closingChannel, false, true), but looking closer, I think I was wrong. If something is in closingChannels, then we have already called disconnect. So, maybe this idea doesn't gain us anything, just an additional private method that doesn't get used by anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too happy about having some of this logic in this method. How about we introduce a closeClosingChannel private method that does what we're doing here? It's a simple method, but it seems clearer from a semantic perspective. We could do something similar for the logic that updates closingChannels in clear(). What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is clear() is iterating through closingChannels so the code that removes the channel from closingChannels needs to use the iterator in clear. So the common code is just calling doClose. Not sure if that is worth is a separate method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the handling of close to clear(), so closingChannels is updated only in clear(). Can you check if it is any better? Thank you.

@rajinisivaram
Copy link
Contributor Author

System tests are running here: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/614/. They don't contain the latest changes.

}
closingChannels.put(channel.id(), channel);
} else
doClose(channel, notifyDisconnect);
this.channels.remove(channel.id());
this.sensors.connectionClosed.record();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in doClose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.

@ijuma
Copy link
Contributor

ijuma commented Nov 18, 2016

Thanks @rajinisivaram, LGTM. I'll wait for the system tests and for @junrao to take a look after the latest changes.

@rajinisivaram
Copy link
Contributor Author

@ijuma Thank you. The system test run that I started this morning is half-way through and looks good so far. I will leave it to complete. I have scheduled one more run since the changes from today were significant.

close(channel);
String connectionId = send.destination();
if (closingChannels.containsKey(connectionId))
this.failedSends.add(connectionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems that failedSends is never used in the caller to Selector. Let's say the producer sends 2 outstanding produce requests with ack=1 and then closes the socket. A broker gets the first request, processes it and tries to send a response. The send fails and is added to failedSends. However, since SocketServer never checks failedSends, it doesn't know the send fails or the channel is disconnected. So, this socket will never be closed by the broker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failedSends are handled by clear() invoked at the start of poll. Closing sockets are closed if there are no staged receives or if they are in failedSends. They are also added to the disconnected list.

* Return the channel with the specified id if it was disconnected, but not yet closed
* since there are outstanding messages to be processed.
*/
public KafkaChannel closingChannel(String id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public api that any caller could use, it seems that it should be in the Selectable interface since we may need to mock it during testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment you can't get hold of KafkaChannel using the Selectable interface. If closingChannel is added to the interface, perhaps channel(String id) needs to be as well? Not sure if you would want one but not the other.

@@ -447,7 +447,7 @@ public int read(ByteBuffer dst) throws IOException {
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
if (netReadBuffer.remaining() > 0) {
int netread = socketChannel.read(netReadBuffer);
if (netread == 0 && netReadBuffer.position() == 0) return netread;
if (netread == 0 && netReadBuffer.position() == 0) return read;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's clearer to just return 0?

@@ -488,7 +488,9 @@ public int read(ByteBuffer dst) throws IOException {
}
break;
} else if (unwrapResult.getStatus() == Status.CLOSED) {
throw new EOFException();
// If data has been read and unwrapped, return the data. Close will be handled on the next poll.
if (!appReadBuffer.hasRemaining())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure about this check. Is it possible for appReadBuffer to have remaining bytes while unwrapResult's status is closed? If so, that status will remain in closed in the while loop and we will never get a chance to read the remaining bytes in appReadBuffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Producer sends a message to Kafka with acks=0 (4-bytes size + 100 bytes data) and closes the socket. Broker reads and unwraps the data. The first read that unwraps the data unwraps the all the received data and the unwrap result has status CLOSED. appReadBuffer now has 104 bytes. But the first read() only requested the 4 bytes size. We want read() to return the 4 bytes rather than throw EOF. The next read() doesn't need to unwrap because 100-bytes requested is already in appReadBuffer. That goes through the first readFromAppBuffer(dst) in the code. After the data is processed, the next attempt to read will generate EOFException.

The previous comment is not letting me add a response. So I will add that here too. The value returned there was netread=0. I changed it to read because read could be > 0 if bytes remaining in appReadBuffer were copied into dst before trying to read more data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I just realized that the check worked for the wrong reasons, fixed it.

@junrao
Copy link
Contributor

junrao commented Nov 18, 2016

@rajinisivaram : Thanks for the latest patch and explanation. LGTM. From my perspective, just need to wait for system tests to pass.

@ijuma
Copy link
Contributor

ijuma commented Nov 19, 2016

The first system tests run passed, the second is in progress: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/615/

@rajinisivaram
Copy link
Contributor Author

There was one test failure (StreamsSmokeTest.test_streams) in https://jenkins.confluent.io/job/system-test-kafka-branch-builder/615, but I think the failure is unrelated since yesterday's system test run on trunk also had the same failure (https://jenkins.confluent.io/job/system-test-kafka/413).

@ijuma
Copy link
Contributor

ijuma commented Nov 19, 2016

Yes, that test seems to fail transiently and there's a PR for it: #2149

Merging this to trunk.

@asfgit asfgit closed this in e53baba Nov 19, 2016
@ciprianpascu
Copy link

Hi,
It seems this fix is not present in the current (0.10.1.1) release. Can you explain me why? And, also, is anyone able to say which release will have it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants