Skip to content

Commit

Permalink
Use JDK Condition.await for Mutex sleep.
Browse files Browse the repository at this point in the history
Previous logic used the same semaphore to sleep as any other sleep
which interfered with code that expected the Mutex to be the lock
used. The new logic uses the Mutex's JDK Lock, via a Condition, to
do the sleeping.

Because it is not possible for us to change the artificial thread
status we maintain to "sleep" after the lock is released, this
modified logic also introduces a new thread state that indicates
that the native JDK thread state should be used. This gets closer
to avoiding the racing status.

It does not appear to eliminate the race altogether.

Fixes jruby#5863.
  • Loading branch information
headius committed Oct 24, 2019
1 parent cc6bf49 commit 1ed4081
Showing 1 changed file with 55 additions and 15 deletions.
70 changes: 55 additions & 15 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import com.headius.backport9.stack.StackWalker;
Expand Down Expand Up @@ -165,7 +166,7 @@ public class RubyThread extends RubyObject implements ExecutionContext {

/** Thread statuses */
public enum Status {
RUN, SLEEP, ABORTING, DEAD;
RUN, SLEEP, ABORTING, DEAD, NATIVE;

public final ByteList bytes;

Expand Down Expand Up @@ -257,7 +258,7 @@ private void executeInterrupts(ThreadContext context, boolean blockingTiming) {
toKill();
} else {
afterBlockingCall();
if (status.get() == Status.SLEEP) {
if (getStatus() == Status.SLEEP) {
exitSleep();
}
// if it's a Ruby exception, force the cause through
Expand Down Expand Up @@ -937,7 +938,7 @@ public final Map<Object, IRubyObject> getContextVariables() {
}

public boolean isAlive(){
return threadImpl.isAlive() && status.get() != Status.DEAD;
return threadImpl.isAlive() && getStatus() != Status.DEAD;
}

@JRubyMethod
Expand Down Expand Up @@ -1239,7 +1240,7 @@ public static IRubyObject stop(ThreadContext context, IRubyObject receiver) {

synchronized (rubyThread) {
rubyThread.pollThreadEvents(context);
Status oldStatus = rubyThread.status.get();
Status oldStatus = rubyThread.getStatus();
try {
rubyThread.status.set(Status.SLEEP);
rubyThread.wait();
Expand Down Expand Up @@ -1269,12 +1270,12 @@ public static IRubyObject exit(IRubyObject receiver, Block block) {
@JRubyMethod(name = "stop?")
public RubyBoolean stop_p() {
// not valid for "dead" state
return getRuntime().newBoolean(status.get() == Status.SLEEP || status.get() == Status.DEAD);
return getRuntime().newBoolean(getStatus() == Status.SLEEP || getStatus() == Status.DEAD);
}

@JRubyMethod
public synchronized RubyThread wakeup() {
if(!threadImpl.isAlive() && status.get() == Status.DEAD) {
if(!threadImpl.isAlive() && getStatus() == Status.DEAD) {
throw getRuntime().newThreadError("killed thread");
}

Expand Down Expand Up @@ -1469,7 +1470,7 @@ public IRubyObject status() { // not used

@JRubyMethod
public IRubyObject status(ThreadContext context) {
final Status status = this.status.get();
final Status status = getStatus();
if (threadImpl.isAlive() && status != Status.DEAD) { // isAlive()
return context.runtime.getThreadStatus(status);
}
Expand Down Expand Up @@ -1568,18 +1569,24 @@ public void executeBlockingTask(BlockingTask task) throws InterruptedException {
}

public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
return executeTask(context, data, Status.SLEEP, task);
}

public <Data, Return> Return executeTask(ThreadContext context, Data data, Status status, Task<Data, Return> task) throws InterruptedException {
Status oldStatus = this.status.get();
try {
this.unblockArg = data;
this.unblockFunc = task;

// check for interrupt before going into blocking call
pollThreadEvents(context);

this.status.set(status);
enterSleep();

return task.run(context, data);
} finally {
exitSleep();
this.status.set(oldStatus);
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
Expand All @@ -1591,11 +1598,34 @@ public void enterSleep() {
}

public void exitSleep() {
if (status.get() != Status.ABORTING) {
if (getStatus() != Status.ABORTING) {
status.set(Status.RUN);
}
}

private Status getStatus() {
Status status = this.status.get();

if (status != Status.NATIVE) return status;

return nativeStatus();
}

private Status nativeStatus() {
switch (getNativeThread().getState()) {
case NEW:
case RUNNABLE:
default:
return Status.RUN;
case BLOCKED:
case WAITING:
case TIMED_WAITING:
return Status.SLEEP;
case TERMINATED:
return Status.DEAD;
}
}

@JRubyMethod(name = {"kill", "exit", "terminate"})
public IRubyObject kill() {
Ruby runtime = getRuntime();
Expand Down Expand Up @@ -2199,12 +2229,22 @@ public void sleep(Lock lock) throws InterruptedException {
*/
public void sleep(Lock lock, long millis) throws InterruptedException {
assert Thread.currentThread() == getNativeThread();
try {
unlock(lock);
sleep(millis);
} finally {
lock(lock);
}
executeTask(getContext(), lock.newCondition(), Status.NATIVE, new Task<Condition, Object>() {
@Override
public Object run(ThreadContext context, Condition condition) throws InterruptedException {
if (millis == 0) {
condition.await();
} else {
condition.await(millis, TimeUnit.MILLISECONDS);
}
return null;
}

@Override
public void wakeup(RubyThread thread, Condition condition) {
thread.getNativeThread().interrupt();
}
});
}

private String identityString() {
Expand Down

0 comments on commit 1ed4081

Please sign in to comment.