Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed wakeup bug in NioOutboundPipeline #14831

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,7 @@
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.hazelcast.internal.metrics.ProbeLevel.DEBUG;
import static com.hazelcast.internal.networking.HandlerStatus.CLEAN;
Expand All @@ -52,6 +52,12 @@ public final class NioOutboundPipeline
extends NioPipeline
implements Supplier<OutboundFrame>, OutboundPipeline {

private enum State {
UNSCHEDULED,
pveentjer marked this conversation as resolved.
Show resolved Hide resolved
SCHEDULED,
BLOCKED
}

@SuppressWarnings("checkstyle:visibilitymodifier")
@Probe(name = "writeQueueSize")
public final Queue<OutboundFrame> writeQueue = new ConcurrentLinkedQueue<OutboundFrame>();
Expand All @@ -62,7 +68,7 @@ public final class NioOutboundPipeline
private OutboundHandler[] handlers = new OutboundHandler[0];
private ByteBuffer sendBuffer;

private final AtomicBoolean scheduled = new AtomicBoolean(true);
private final AtomicReference<State> scheduled = new AtomicReference<State>(State.SCHEDULED);
@Probe(name = "bytesWritten")
private final SwCounter bytesWritten = newSwCounter();
@Probe(name = "normalFramesWritten")
Expand Down Expand Up @@ -137,8 +143,8 @@ private long idleTimeMs() {
}

@Probe(level = DEBUG)
private long isScheduled() {
return scheduled.get() ? 1 : 0;
private long scheduled() {
return scheduled.get().ordinal();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rather unconventional. Any reason we can't use something like return scheduled.get() == State.SCHEDULED ? 1 : 0?
I'm thinking if someone changes the order. The value of this method returns as well.

}

public void write(OutboundFrame frame) {
Expand Down Expand Up @@ -176,7 +182,7 @@ public OutboundFrame get() {
* If the OutboundHandler already is scheduled, the call is ignored.
*/
private void schedule() {
if (scheduled.get() || !scheduled.compareAndSet(false, true)) {
if (scheduled.get() == State.SCHEDULED || !scheduled.compareAndSet(State.UNSCHEDULED, State.SCHEDULED)) {
// So this pipeline is still scheduled, we don't need to schedule it again
if (writeThroughEnabled) {
concurrencyDetection.onDetected();
Expand Down Expand Up @@ -219,6 +225,7 @@ public void process() throws Exception {

flushToSocket();


if (migrationRequested()) {
startMigration();
// we leave this method and the NioOutboundPipeline remains scheduled.
Expand All @@ -242,6 +249,7 @@ public void process() throws Exception {
case BLOCKED:
// pipeline is blocked; no point in receiving OP_WRITE events.
unregisterOp(OP_WRITE);
scheduled.set(State.BLOCKED);
break;
default:
throw new IllegalStateException();
Expand Down Expand Up @@ -281,7 +289,7 @@ private void unschedule() throws IOException {
// since everything is written, we are not interested anymore in write-events, so lets unsubscribe
unregisterOp(OP_WRITE);
// So the outputBuffer is empty, so we are going to unschedule the pipeline.
scheduled.set(false);
scheduled.set(State.UNSCHEDULED);

if (writeQueue.isEmpty() && priorityWriteQueue.isEmpty()) {
// there are no remaining frames, so we are done.
Expand All @@ -290,7 +298,7 @@ private void unschedule() throws IOException {

// So there are frames, but we just unscheduled ourselves. If we don't try to reschedule, then these
// Frames are at risk not to be send.
if (!scheduled.compareAndSet(false, true)) {
if (!scheduled.compareAndSet(State.UNSCHEDULED, State.SCHEDULED)) {
//someone else managed to schedule this OutboundHandler, so we are done.
if (writeThroughEnabled) {
concurrencyDetection.onDetected();
Expand All @@ -314,7 +322,7 @@ private void flushToSocket() throws IOException {
lastWriteTime = currentTimeMillis();
int written = socketChannel.write(sendBuffer);
bytesWritten.inc(written);
//System.out.println(channel+" bytes written:"+written);
//System.out.println(channel + " bytes written:" + written);
}

void drainWriteQueues() {
Expand Down Expand Up @@ -416,9 +424,15 @@ private String pipelineToString() {

@Override
public OutboundPipeline wakeup() {
// we only want to wake up this pipeline if it isn't scheduled. Otherwise we'll have
// multiple threads potentially messing around with the pipeline.
if (scheduled.compareAndSet(false, true)) {
// // we only want to wake up this pipeline if it isn't scheduled. Otherwise we'll have
// // multiple threads potentially messing around with the pipeline.
State state = scheduled.get();
if (state == State.SCHEDULED) {
return this;
}

// if it is unscheduled or blocked, we need to schedule it.
if (scheduled.compareAndSet(state, State.SCHEDULED)) {
// this will lead to the process method being called and as part of the process,
// the scheduled flag will be unset (if possible).
ownerAddTaskAndWakeup(this);
Expand Down